python - 从异步子进程读取流输出
问题描述
我正在尝试从子进程中运行的程序读取 URL,然后安排异步 HTTP 请求,但看起来请求正在同步运行。那是因为子进程和请求都在同一个协程函数中运行吗?
测试.py
import random
import time
URLS = ['http://example.com', 'http://example.com/sleep5s']
def main():
for url in random.choices(URLS, weights=(1, 1), k=5):
print(url)
time.sleep(random.uniform(0.5, 1))
if __name__ == '__main__':
main()
主文件
import asyncio
import sys
import httpx
from httpx.exceptions import TimeoutException
async def req(url):
async with httpx.AsyncClient() as client:
try:
r = await client.get(url, timeout=2)
print(f'Response {url}: {r.status_code}')
except Exception as TimeoutException:
print(f'TIMEOUT - {url}')
except Exception as exc:
print(f'ERROR - {url}')
async def run():
proc = await asyncio.create_subprocess_exec(
sys.executable,
'-u',
'test.py',
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
while True:
line = await proc.stdout.readline()
if not line:
break
url = line.decode().rstrip()
print(f'Found URL: {url}')
resp = await req(url)
await proc.wait()
async def main():
await run()
if __name__ == '__main__':
asyncio.run(main())
测试
$ python main.py
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
Found URL: http://example.com
Response http://example.com: 200
Found URL: http://example.com/sleep5s
TIMEOUT - http://example.com/sleep5s
解决方案
看起来请求正在同步运行。那是因为子进程和请求都在同一个协程函数中运行吗?
你的诊断是正确的。await
意思是它在罐子上说的:协程在给你结果之前不会继续。幸运的是,asyncio 使得在后台运行协程变得容易:
tasks = []
while True:
line = await proc.stdout.readline()
if not line:
break
url = line.decode().rstrip()
print(f'Found URL: {url}')
tasks.append(asyncio.create_task(req(url)))
resps = asyncio.gather(*tasks)
await proc.wait()
注意:
asyncio.create_task()
确保即使我们仍在阅读这些行,请求也开始被处理asyncio.gather()
确保在协程完成之前实际上等待所有任务。它还提供对响应的访问并传播异常(如果有)。
推荐阅读
- kubernetes - 使用 kubernetes 和 Logstash 的 Permission Denied 错误
- linux - 对两个不同的 ssh auth 文件使用相同的密码
- yaml - 找不到文件 /dataproc/v1/PropertiesValue.yaml
- c# - 如何正确停止异步 TcpListener?
- merge - 尝试使用 Merge 更新 IBM DB2 中的表时出错
- node.js - 如何从浏览器中删除我网站的源代码
- python-3.x - 如何从使用 Flask 实现的 REST API 获取更新(响应)?如果不可能,是否有任何解决方法?
- flutter - 如何发回数据并导航到特定屏幕?[扑]
- javascript - 为什么尝试将元素附加到文档时出现错误?
- unity3d - 我想在数组中实例化预制件后存储它们