首页 > 解决方案 > asyncio 子进程可以与 contextmanager 一起使用吗?

问题描述

在python(3.7+)中,我试图将一个子进程作为上下文管理器运行,同时异步流式传输潜在的大量标准输出。问题是我似乎无法让 contextmanager 的主体与 stdout 回调异步运行。我曾尝试使用线程,在那里运行异步函数,但后来我无法弄清楚如何将 Process 对象返回到上下文管理器中。

所以问题是:如何在主线程运行时从主线程中的上下文管理器产生异步进程对象?也就是说,我想在 open_subprocess() 在下面的代码中完成运行之前从它产生已经和当前正在运行的进程。

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd,
        *args,
        stdout=asyncio.subprocess.PIPE)
    read = read_stream(proc, proc.stdout, stdout_callback)
    await asyncio.wait([read])
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    proc_coroutine = stream_subprocess(
        cmd,
        *args,
        stdout_callback=stdout_callback)
    # The following blocks until proc has finished
    # I would like to yield proc while it is running
    proc = asyncio.run(proc_coroutine)
    yield proc
    proc.terminate()

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc:
        # The following code only runs after proc completes
        # but I would expect these print statements to
        # be interleaved with the output from the subprocess
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

标签: pythonpython-3.xsubprocesspython-asynciocontextmanager

解决方案


Asyncio 通过暂停任何看起来可能会阻塞的东西来提供并行执行。为此,所有代码都必须在回调或协程内,并且避免调用阻塞函数,如time.sleep(). 除此之外,您的代码还有一些其他问题,例如await asyncio.wait([x])相当于await x,这意味着在open_subprocess完成所有流读取之前不会产生。

构建代码的正确方法是将顶级代码移动到async def并使用异步上下文管理器。例如:

import asyncio
import contextlib

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
    proc = await asyncio.create_subprocess_exec(
        cmd, *args, stdout=asyncio.subprocess.PIPE)
    asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
    yield proc
    if proc.returncode is None:
        proc.terminate()
        await proc.wait()

async def main():
    def stdout_callback(data):
        print('STDOUT:', data)

    async with open_subprocess('ping', '-c', '4', 'localhost',
                               stdout_callback=stdout_callback) as proc:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc.pid}...')
            await asyncio.sleep(1)

    print(f'RETURN CODE: {proc.returncode}')

asyncio.run(main())

如果您坚持混合同步和异步代码,则需要通过在单独的线程中运行 asyncio 事件循环来完全分离它们。然后你的主线程将无法直接访问 asyncio 对象,proc因为它们不是线程安全的。您需要始终如一地使用事件循环call_soon_threadsaferun_coroutine_threadsafe与之通信。

这种方法很复杂,需要线程间通信和摆弄事件循环,所以除了作为学习练习外,我不建议这样做。更不用说如果你使用另一个线程,你根本不需要 asyncio ——你可以直接在另一个线程中发出同步调用。但是话虽如此,这里有一个可能的实现:

import asyncio
import contextlib
import concurrent.futures
import threading

async def read_stream(proc, stream, callback):
    while proc.returncode is None:
        data = await stream.readline()
        if data:
            callback(data.decode().rstrip())
        else:
            break

async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
    try:
        proc = await asyncio.create_subprocess_exec(
            cmd, *args, stdout=asyncio.subprocess.PIPE)
    except Exception as e:
        proc_data_future.set_exception(e)
        raise
    proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
    await read_stream(proc, proc.stdout, stdout_callback)
    return proc

@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
    loop = asyncio.new_event_loop()
    # needed to use asyncio.subprocess outside the main thread
    asyncio.get_child_watcher().attach_loop(loop)
    threading.Thread(target=loop.run_forever).start()
    proc_data_future = concurrent.futures.Future()
    loop.call_soon_threadsafe(
        loop.create_task,
        stream_subprocess(cmd, *args,
                          proc_data_future=proc_data_future,
                          stdout_callback=stdout_callback))
    proc_data = proc_data_future.result()
    yield proc_data
    async def terminate(proc):
        if proc.returncode is None:
            proc.terminate()
            await proc.wait()
    asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
    proc_data['returncode'] = proc_data['proc'].returncode
    loop.call_soon_threadsafe(loop.stop)

if __name__ == '__main__':
    import time

    def stdout_callback(data):
        print('STDOUT:', data)

    with open_subprocess('ping', '-c', '4', 'localhost',
                         stdout_callback=stdout_callback) as proc_data:
        for i in range(2):
            print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
            time.sleep(1)

    print(f'RETURN CODE: {proc_data["returncode"]}')

推荐阅读