首页 > 解决方案 > 子进程异常终止如何处理?

问题描述

我正在使用 python 3.7 并遵循此文档。我想要一个进程,它应该产生一个子进程,等待它完成任务,然后获取一些信息。我使用以下代码:

if __name__ == '__main__':
    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    print q.get()
    p.join()

当子进程正确完成时,没有问题,并且效果很好,但是当我的子进程在完成之前终止时,问题就开始了。在这种情况下,我的应用程序正在等待。

给超时q.get()p.join()不是完全解决问题,因为我想立即知道子进程死亡而不是等待超时。

另一个问题是超时会q.get()产生异常,我更愿意避免这种情况。

有人可以建议我一种更优雅的方式来克服这些问题吗?

标签: pythonprocessmultiprocessingqueuepython-multiprocessing

解决方案


队列和信号

一种可能性是注册一个信号处理程序并使用它来传递一个标记值。在 Unix 上,您可以SIGCHLD在父级中处理,但这不是您的选择。根据信号模块

在 Windows 上,signal() 只能与 SIGABRT、SIGFPE、SIGILL、SIGINT、SIGSEGV、SIGTERM 或 SIGBREAK 一起调用。

不确定通过任务管理器杀死它是否会转化为SIGTERM,但你可以试一试。

对于处理SIGTERM,您需要在孩子中注册信号处理程序。

import os
import sys
import time
import signal
from functools import partial
from multiprocessing import Process, Queue

SENTINEL = None


def _sigterm_handler(signum, frame, queue):
    print("received SIGTERM")
    queue.put(SENTINEL)
    sys.exit()


def register_sigterm(queue):
    global _sigterm_handler
    _sigterm_handler = partial(_sigterm_handler, queue=queue)
    signal.signal(signal.SIGTERM, _sigterm_handler)


def some_func(q):
    register_sigterm(q)
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()
    for msg in iter(q.get, SENTINEL):
        print(msg)
    p.join()

示例输出:

12273
msg_0
msg_1
msg_2
msg_3
received SIGTERM

Process finished with exit code 0

队列进程.is_alive()

即使这适用于任务管理器,你的用例听起来你不能排除强制杀戮,所以我认为你最好使用不依赖信号的方法。

如果您的 process p.is_alive()queue.get()使用timeout指定的调用并处理Empty异常,您可以检查循环:

import os
import time
from queue import Empty
from multiprocessing import Process, Queue

def some_func(q):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        q.put(f'msg_{i}')


if __name__ == '__main__':

    q = Queue()
    p = Process(target=some_func, args=(q,))
    p.start()

    while p.is_alive():
        try:
            msg = q.get(timeout=0.1)
        except Empty:
            pass
        else:
            print(msg)

    p.join()

也可以避免异常,但我不建议这样做,因为您不会将等待时间花在“排队”上,因此会降低响应能力:

while p.is_alive():
    if not q.empty():
        msg = q.get_nowait()
        print(msg)
        time.sleep(0.1)

管道流程.is_alive()

如果您打算为每个孩子使用一个连接,则可以使用管道而不是队列。它比队列(安装在管道顶部)性能更高,您可以使用multiprocessing.connection.wait(Python 3.3+)一次等待多个对象的就绪状态。

multiprocessing.connection.wait(object_list, timeout=None)

等到 object_list 中的对象准备好。返回 object_list 中准备好的那些对象的列表。如果 timeout 是一个浮点数,那么调用最多会阻塞那么多秒。如果 timeout 是 None 那么它将无限期地阻塞。负超时等效于零超时。

对于 Unix 和 Windows,如果一个对象是可读的 Connection 对象,它就可以出现在 object_list 中;一个已连接且可读的 socket.socket 对象;或 Process 对象的 sentinel 属性。当有数据可供读取或另一端已关闭时,连接或套接字对象已准备就绪。

Unix:wait(object_list, timeout) 几乎等同于 select.select(object_list, [], [], timeout)。不同之处在于,如果 select.select() 被信号中断,它会引发 OSError,错误号为 EINTR,而 wait() 不会。

Windows:object_list 中的项目必须是可等待的整数句柄(根据 Win32 函数 WaitForMultipleObjects() 的文档使用的定义),或者它可以是具有返回套接字句柄的 fileno() 方法的对象,或者管柄。(请注意,管道句柄和套接字句柄不是可等待句柄。)

您可以使用它来同时等待进程的哨兵属性和管道的父端。

import os
import time
from multiprocessing import Process, Pipe
from multiprocessing.connection import wait


def some_func(conn_write):
    print(os.getpid())
    for i in range(30):
        time.sleep(1)
        conn_write.send(f'msg_{i}')


if __name__ == '__main__':

    conn_read, conn_write = Pipe(duplex=False)
    p = Process(target=some_func, args=(conn_write,))
    p.start()

    while p.is_alive():
        wait([p.sentinel, conn_read])  # block-wait until something gets ready
        if conn_read.poll():  # check if something can be received
            print(conn_read.recv())
    p.join()

推荐阅读