python - AsyncGenerator 上的异步 for 循环
问题描述
拥有一个异步生成器,我希望能够异步迭代它。但是,我遗漏了一些东西或弄乱了一些东西,或者两者兼而有之,因为我最终得到了一个常规的同步 for 循环:
import asyncio
async def time_consuming(t):
print(f"Going to sleep for {t} seconds")
await asyncio.sleep(t)
print(f"Slept {t} seconds")
return t
async def generator():
for i in range(4, 0, -1):
yield await time_consuming(i)
async def consumer():
async for t in generator():
print(f"Doing something with {t}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
loop.run_until_complete(consumer())
loop.close()
这将需要大约 12 秒的时间来运行并返回:
Going to sleep for 4 seconds
Slept 4 seconds
Doing something with 4
Going to sleep for 3 seconds
Slept 3 seconds
Doing something with 3
Going to sleep for 2 seconds
Slept 2 seconds
Doing something with 2
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
虽然我预计它需要大约 4 秒才能运行并返回如下内容:
Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 4 seconds
Doing something with 4
Slept 3 seconds
Doing something with 3
Slept 2 seconds
Doing something with 2
Slept 1 seconds
Doing something with 1
解决方案
异步生成器并不意味着您同时执行迭代!您所获得的只是协程有更多的空间让步给其他任务。迭代步骤仍然连续运行。
换句话说:异步迭代器对于需要使用 I/O 来获取每个迭代步骤的迭代器很有用。考虑循环访问 Web 套接字的结果或文件中的行。如果next()
迭代器的每一步都需要等待一个缓慢的 I/O 源来提供数据,那么这是将控制权交给已设置为并发运行的其他东西的好点。
如果您希望生成器的每个单独步骤同时运行,那么您仍然必须使用事件循环显式地安排其他任务。
当所有这些额外任务完成后,您就可以从生成器返回。如果您将 4 个time_consuming()
协程安排为任务,用于asyncio.wait()
等待一个或所有任务完成,并从已完成的任务中产生结果,那么是的,在您的for i in range(...):
循环完成后,您的过程总共只需要 4 秒:
async def generator():
pending = []
for i in range(4, 0, -1):
pending.append(asyncio.create_task(time_consuming(i)))
while pending:
done, pending = await asyncio.wait(pending, return_when=asyncio.FIRST_COMPLETED)
for task in done:
yield task.result()
此时输出变为
Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Doing something with 1
Slept 2 seconds
Doing something with 2
Slept 3 seconds
Doing something with 3
Slept 4 seconds
Doing something with 4
请注意,这是与预期输出相反的顺序,因为这会在任务完成时获取任务结果,而不是等待创建的第一个任务完成。通常这就是你想要的,真的。当您在 1 之后已经准备好结果时,为什么还要等待 4 秒?
你也可以有你的变体,但你只是用不同的方式编码。然后你可以只使用asyncio.gather()
4 个任务,它安排一堆协程作为并发任务运行,并将它们的结果作为列表返回,之后你可以产生这些结果:
async def generator():
tasks = []
for i in range(4, 0, -1):
tasks.append(time_consuming(i))
for res in await asyncio.gather(*tasks):
yield res
但现在输出变成
Going to sleep for 4 seconds
Going to sleep for 3 seconds
Going to sleep for 2 seconds
Going to sleep for 1 seconds
Slept 1 seconds
Slept 2 seconds
Slept 3 seconds
Slept 4 seconds
Doing something with 4
Doing something with 3
Doing something with 2
Doing something with 1
因为在最长的任务 , 完成之前我们不能做任何进一步的事情time_consuming(4)
,但是运行时间较短的任务在此之前完成并且已经输出了它们的Slept ... seconds
消息。
推荐阅读
- java - 在 Tomcat 服务器上使用 POST 方法从 jsp 到 jsp 的编码错误
- android - 无法在视频上使用 FFmpeg 绘制文本
- javascript - 如何使用我添加的外部脚本来响应 JS?
- elasticsearch - 在批量索引期间检测更改
- regex - 正则表达式逻辑
- node.js - Node js上的Heroku登录问题
- android - TextView 未以编程方式添加到我的布局中
- peoplesoft - 查找 SendMail 人员代码
- json - 用 jq 解析一组 JSON 对象
- typescript - 带有自定义 equals 方法的 TypeScript Set