运行多个线程 - AWS Panorama

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

运行多个线程

您可以运行应用程序逻辑处理线程,也可以使用其他线程处理其他后台进程。例如,您可以创建一个线程为调试提供 HTTP 流量,或创建一个线程以监控推理结果并将数据发送到 AWS。

要运行多个线程,可以使用 Python 标准库中的线程模块为每个进程创建一个线程。以下示例显示了调试服务器示例应用程序的主循环,该循环创建了一个应用程序对象并使用它来运行三个线程。

packages/123456789012-DEBUG_SERVER-1.0/application.py - 主循环
def main(): panorama = panoramasdk.node() while True: try: # Instantiate application logger.info('INITIALIZING APPLICATION') app = Application(panorama) # Create threads for stream processing, debugger, and client app.run_thread = threading.Thread(target=app.run_cv) app.server_thread = threading.Thread(target=app.run_debugger) app.client_thread = threading.Thread(target=app.run_client) # Start threads logger.info('RUNNING APPLICATION') app.run_thread.start() logger.info('RUNNING SERVER') app.server_thread.start() logger.info('RUNNING CLIENT') app.client_thread.start() # Wait for threads to exit app.run_thread.join() app.server_thread.join() app.client_thread.join() logger.info('RESTARTING APPLICATION') except: logger.exception('Exception during processing loop.')

当所有线程都退出时,应用程序会自行重新启动。run_cv 循环处理来自摄像机视频流的图像。如果它收到停止信号,就会关闭调试程序进程,而调试程序进程在 HTTP 服务器上运行,无法自行关闭。每个线程必须处理各自的错误。如果未捕获并记录错误,则线程将以静默方式退出。

packages/123456789012-DEBUG_SERVER-1.0/application.py - 处理循环
# Processing loop def run_cv(self): """Run computer vision workflow in a loop.""" logger.info("PROCESSING STREAMS") while not self.terminate: try: self.process_streams() # turn off debug logging after 15 loops if logger.getEffectiveLevel() == logging.DEBUG and self.frame_num == 15: logger.setLevel(logging.INFO) except: logger.exception('Exception on processing thread.') # Stop signal received logger.info("SHUTTING DOWN SERVER") self.server.shutdown() self.server.server_close() logger.info("EXITING RUN THREAD")

线程通过应用程序的 self 对象进行通信。要重新启动应用程序处理循环,调试程序线程会调用 stop 方法。此方法设置了一个 terminate 属性,向其他线程发出关闭信号。

packages/123456789012-DEBUG_SERVER-1.0/application.py - 停止方法
# Interrupt processing loop def stop(self): """Signal application to stop processing.""" logger.info("STOPPING APPLICATION") # Signal processes to stop self.terminate = True # HTTP debug server def run_debugger(self): """Process debug commands from local network.""" class ServerHandler(SimpleHTTPRequestHandler): # Store reference to application application = self # Get status def do_GET(self): """Process GET requests.""" logger.info('Get request to {}'.format(self.path)) if self.path == "/status": self.send_200('OK') else: self.send_error(400) # Restart application def do_POST(self): """Process POST requests.""" logger.info('Post request to {}'.format(self.path)) if self.path == '/restart': self.send_200('OK') ServerHandler.application.stop() else: self.send_error(400)