python - asyncio 子进程可以与 contextmanager 一起使用吗?
问题描述
在python(3.7+)中,我试图将一个子进程作为上下文管理器运行,同时异步流式传输潜在的大量标准输出。问题是我似乎无法让 contextmanager 的主体与 stdout 回调异步运行。我曾尝试使用线程,在那里运行异步函数,但后来我无法弄清楚如何将 Process 对象返回到上下文管理器中。
所以问题是:如何在主线程运行时从主线程中的上下文管理器产生异步进程对象?也就是说,我想在 open_subprocess() 在下面的代码中完成运行之前从它产生已经和当前正在运行的进程。
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd,
*args,
stdout=asyncio.subprocess.PIPE)
read = read_stream(proc, proc.stdout, stdout_callback)
await asyncio.wait([read])
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
proc_coroutine = stream_subprocess(
cmd,
*args,
stdout_callback=stdout_callback)
# The following blocks until proc has finished
# I would like to yield proc while it is running
proc = asyncio.run(proc_coroutine)
yield proc
proc.terminate()
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
# The following code only runs after proc completes
# but I would expect these print statements to
# be interleaved with the output from the subprocess
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
time.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
解决方案
Asyncio 通过暂停任何看起来可能会阻塞的东西来提供并行执行。为此,所有代码都必须在回调或协程内,并且避免调用阻塞函数,如time.sleep()
. 除此之外,您的代码还有一些其他问题,例如await asyncio.wait([x])
相当于await x
,这意味着在open_subprocess
完成所有流读取之前不会产生。
构建代码的正确方法是将顶级代码移动到async def
并使用异步上下文管理器。例如:
import asyncio
import contextlib
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
@contextlib.asynccontextmanager
async def open_subprocess(cmd, *args, stdout_callback=print):
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
asyncio.create_task(read_stream(proc, proc.stdout, stdout_callback))
yield proc
if proc.returncode is None:
proc.terminate()
await proc.wait()
async def main():
def stdout_callback(data):
print('STDOUT:', data)
async with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc.pid}...')
await asyncio.sleep(1)
print(f'RETURN CODE: {proc.returncode}')
asyncio.run(main())
如果您坚持混合同步和异步代码,则需要通过在单独的线程中运行 asyncio 事件循环来完全分离它们。然后你的主线程将无法直接访问 asyncio 对象,proc
因为它们不是线程安全的。您需要始终如一地使用事件循环call_soon_threadsafe
并run_coroutine_threadsafe
与之通信。
这种方法很复杂,需要线程间通信和摆弄事件循环,所以除了作为学习练习外,我不建议这样做。更不用说如果你使用另一个线程,你根本不需要 asyncio ——你可以直接在另一个线程中发出同步调用。但是话虽如此,这里有一个可能的实现:
import asyncio
import contextlib
import concurrent.futures
import threading
async def read_stream(proc, stream, callback):
while proc.returncode is None:
data = await stream.readline()
if data:
callback(data.decode().rstrip())
else:
break
async def stream_subprocess(cmd, *args, proc_data_future, stdout_callback=print):
try:
proc = await asyncio.create_subprocess_exec(
cmd, *args, stdout=asyncio.subprocess.PIPE)
except Exception as e:
proc_data_future.set_exception(e)
raise
proc_data_future.set_result({'proc': proc, 'pid': proc.pid})
await read_stream(proc, proc.stdout, stdout_callback)
return proc
@contextlib.contextmanager
def open_subprocess(cmd, *args, stdout_callback=print):
loop = asyncio.new_event_loop()
# needed to use asyncio.subprocess outside the main thread
asyncio.get_child_watcher().attach_loop(loop)
threading.Thread(target=loop.run_forever).start()
proc_data_future = concurrent.futures.Future()
loop.call_soon_threadsafe(
loop.create_task,
stream_subprocess(cmd, *args,
proc_data_future=proc_data_future,
stdout_callback=stdout_callback))
proc_data = proc_data_future.result()
yield proc_data
async def terminate(proc):
if proc.returncode is None:
proc.terminate()
await proc.wait()
asyncio.run_coroutine_threadsafe(terminate(proc_data['proc']), loop).result()
proc_data['returncode'] = proc_data['proc'].returncode
loop.call_soon_threadsafe(loop.stop)
if __name__ == '__main__':
import time
def stdout_callback(data):
print('STDOUT:', data)
with open_subprocess('ping', '-c', '4', 'localhost',
stdout_callback=stdout_callback) as proc_data:
for i in range(2):
print(f'RUNNING SUBPROCESS {proc_data["pid"]}...')
time.sleep(1)
print(f'RETURN CODE: {proc_data["returncode"]}')
推荐阅读
- android - 选项卡布局可能导致内存泄漏
- c# - 应该兼容的不兼容的 .NET Standard 程序集?
- node.js - 无法在节点 js 中显示静态文件夹中的图像
- mysql - Symfony 4 SQLSTATE [42000] 指定的键太长
- ios - 您如何控制要下载的图像数量?
- ios - 自定义标头在发送到服务器时转换为小写(使用 NSURLSession)
- flutter - 如何实现一次只刷一张卡以消除颤动的动作?
- python - 如何在 Databricks 中引用笔记本的路径/%run 在做什么?
- c++ - 如何实现通用 C++ 来配置功能?
- javascript - 将多维字符串转换为数组 Javascript