python - 如何使用 asyncio 从使用 SubprocessProtocol 的子进程中读取并在任意点终止该子进程?
问题描述
使用此处的答案作为基础(使用SubprocessProtocol
),我只是尝试从子进程中读取并在我选择的点停止读取(并终止子进程)(例如,我已经读取了足够的数据)。
请注意,我确实希望使用run_until_complete
per another discussion的好处。
我碰巧使用的是 Windows,下面的示例使用cat
的是 Cygwin。我使用的实际实用程序只是一个本机 Windows 控制台应用程序 - 但它会一直流式传输,直到手动关闭。
我可以很好地读取数据,但是我尝试停止读取并关闭子进程(例如,从内部调用 loop.stop() pipe_data_received()
)会导致异常(RuntimeError: Event loop is closed
和ValueError: I/O operation on closed pipe
)。我想立即优雅地终止子进程。
我不认为这是一个平台,因为我没有看到在哪里正确地中断事情以获得预期的效果。关于如何做到这一点的任何想法?
我的 Python 3.7+ 代码(根据示例修改):
import asyncio
import os
external_program = "cat" # Something that will output to stdio
external_option = "a" # An arbitrarily large amount of data
saved_data = []
class SubprocessProtocol(asyncio.SubprocessProtocol):
def pipe_data_received(self, fd, data):
if fd == 1: # got stdout data (bytes)
data_len = len(data)
print(''.join(' {:02x}'.format(x) for x in data), flush=True)
saved_data.extend(data)
if len(saved_data) > 512: # Stop once we've read this much data
loop.call_soon_threadsafe(loop.stop)
def connection_lost(self, exc):
print("Connection lost")
loop.stop() # end loop.run_forever()
print("START")
if os.name == 'nt':
# On Windows, the ProactorEventLoop is necessary to listen on pipes
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
loop.subprocess_exec(
SubprocessProtocol,
external_program,
external_option,
)
)
loop.run_forever()
finally:
loop.close()
print("DONE")
loop.close()
解决方案
不是 asyncio 专家,但这样的东西应该可以工作。
import time
import asyncio
import threading
class SubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, loop):
self.transport = None
self.loop = loop
def pipe_data_received(self, fd, data):
print('data received')
def connection_lost(self, exc):
print("Connection lost")
def connection_made(self, transport):
print("Connection made")
self.transport = transport
# becasue calc won't call pipe_data_received method.
t = threading.Thread(target=self._endme)
t.setDaemon(True)
t.start()
def _endme(self):
time.sleep(2)
# You'd normally use these inside pipe_data_received, connection_lost methods
self.transport.close()
self.loop.stop()
def main():
loop = asyncio.ProactorEventLoop()
asyncio.set_event_loop(loop)
loop.run_until_complete(loop.subprocess_exec(
lambda: SubprocessProtocol(loop),
'calc.exe'
))
loop.run_forever()
loop.close()
if __name__ == "__main__":
main()
推荐阅读
- amazon-s3 - laravel 存储中的 AWS S3 存储桶仍然导致 401
- javascript - 每分钟运行功能并发送消息
- android - CameraX PreviewView 到位图而不是文件
- node.js - proto3 文件格式数据类型错误中的问题
- firebase - 使用 Coroutines 测试 Firebase 的使用创建用户函数并调用 await() 会导致“此作业尚未完成”
- c# - 在后台线程中从 UserControl 生成 Png
- c# - 在 Neodynamic ThermalLabel SDK [C#] 中作为流导出到 PDF 的问题
- wxmaxima - wxMaxima 从“solve”中提取解决方案
- excel - 复制并粘贴过滤列中的唯一值
- amazon-web-services - Cloudformation 自动分配运营商 IP