python - 子进程异常终止如何处理?
问题描述
我正在使用 python 3.7 并遵循此文档。我想要一个进程,它应该产生一个子进程,等待它完成任务,然后获取一些信息。我使用以下代码:
if __name__ == '__main__':
q = Queue()
p = Process(target=some_func, args=(q,))
p.start()
print q.get()
p.join()
当子进程正确完成时,没有问题,并且效果很好,但是当我的子进程在完成之前终止时,问题就开始了。在这种情况下,我的应用程序正在等待。
给超时q.get()
而p.join()
不是完全解决问题,因为我想立即知道子进程死亡而不是等待超时。
另一个问题是超时会q.get()
产生异常,我更愿意避免这种情况。
有人可以建议我一种更优雅的方式来克服这些问题吗?
解决方案
队列和信号
一种可能性是注册一个信号处理程序并使用它来传递一个标记值。在 Unix 上,您可以SIGCHLD
在父级中处理,但这不是您的选择。根据信号模块:
在 Windows 上,signal() 只能与 SIGABRT、SIGFPE、SIGILL、SIGINT、SIGSEGV、SIGTERM 或 SIGBREAK 一起调用。
不确定通过任务管理器杀死它是否会转化为SIGTERM
,但你可以试一试。
对于处理SIGTERM
,您需要在孩子中注册信号处理程序。
import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue
SENTINEL = None
def _sigterm_handler(signum, frame, queue):
print("received SIGTERM")
queue.put(SENTINEL)
sys.exit()
def register_sigterm(queue):
global _sigterm_handler
_sigterm_handler = partial(_sigterm_handler, queue=queue)
signal.signal(signal.SIGTERM, _sigterm_handler)
def some_func(q):
register_sigterm(q)
print(os.getpid())
for i in range(30):
time.sleep(1)
q.put(f'msg_{i}')
if __name__ == '__main__':
q = Queue()
p = Process(target=some_func, args=(q,))
p.start()
for msg in iter(q.get, SENTINEL):
print(msg)
p.join()
示例输出:
12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM
Process finished with exit code 0
即使这适用于任务管理器,你的用例听起来你不能排除强制杀戮,所以我认为你最好使用不依赖信号的方法。
如果您的 process p.is_alive()
,queue.get()
使用timeout
指定的调用并处理Empty
异常,您可以检查循环:
import os
import time
from queue import Empty
from multiprocessing import Process, Queue
def some_func(q):
print(os.getpid())
for i in range(30):
time.sleep(1)
q.put(f'msg_{i}')
if __name__ == '__main__':
q = Queue()
p = Process(target=some_func, args=(q,))
p.start()
while p.is_alive():
try:
msg = q.get(timeout=0.1)
except Empty:
pass
else:
print(msg)
p.join()
也可以避免异常,但我不建议这样做,因为您不会将等待时间花在“排队”上,因此会降低响应能力:
while p.is_alive():
if not q.empty():
msg = q.get_nowait()
print(msg)
time.sleep(0.1)
如果您打算为每个孩子使用一个连接,则可以使用管道而不是队列。它比队列(安装在管道顶部)性能更高,您可以使用multiprocessing.connection.wait
(Python 3.3+)一次等待多个对象的就绪状态。
multiprocessing.connection.wait(object_list, timeout=None)
等到 object_list 中的对象准备好。返回 object_list 中准备好的那些对象的列表。如果 timeout 是一个浮点数,那么调用最多会阻塞那么多秒。如果 timeout 是 None 那么它将无限期地阻塞。负超时等效于零超时。
对于 Unix 和 Windows,如果一个对象是可读的 Connection 对象,它就可以出现在 object_list 中;一个已连接且可读的 socket.socket 对象;或 Process 对象的 sentinel 属性。当有数据可供读取或另一端已关闭时,连接或套接字对象已准备就绪。
Unix:wait(object_list, timeout) 几乎等同于 select.select(object_list, [], [], timeout)。不同之处在于,如果 select.select() 被信号中断,它会引发 OSError,错误号为 EINTR,而 wait() 不会。
Windows:object_list 中的项目必须是可等待的整数句柄(根据 Win32 函数 WaitForMultipleObjects() 的文档使用的定义),或者它可以是具有返回套接字句柄的 fileno() 方法的对象,或者管柄。(请注意,管道句柄和套接字句柄不是可等待句柄。)
您可以使用它来同时等待进程的哨兵属性和管道的父端。
import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait
def some_func(conn_write):
print(os.getpid())
for i in range(30):
time.sleep(1)
conn_write.send(f'msg_{i}')
if __name__ == '__main__':
conn_read, conn_write = Pipe(duplex=False)
p = Process(target=some_func, args=(conn_write,))
p.start()
while p.is_alive():
wait([p.sentinel, conn_read]) # block-wait until something gets ready
if conn_read.poll(): # check if something can be received
print(conn_read.recv())
p.join()
推荐阅读
- angular - 如何定制
颜色? - bash - 使用 Cloud Build 重启 GCE 实例上的服务
- python - 彩票循环通过列表中的随机数找到找到它需要多少计数
- asp.net - 我如何将链接按钮 oncommand 事件挂钩到另一种方法,比如 gridview OnPageIndexChanging 事件
- javascript - 有没有比异步函数回调更简单的方法来暂停执行?
- html - Angular 中反应式表单的验证器指令
- c# - 窗口最小化时如何强制刷新缩略图?(C#WPF)
- arrays - 有没有办法获取对象中属性值的数组?
- ios - 使用 Coordinator-Pattern 在几个 ViewController 之间传递数据和委托
- python - 从 Python 中的给定元素形成所有可能的唯一 3 位数字组合,