python - await queue.put(item) 到 asyncio.Queue 似乎没有释放对事件循环的控制
问题描述
在这个简单的生产者/消费者示例中,就好像await queue.put(item)
没有释放事件循环以允许消费者运行直到完成。这导致生产者将其所有项目放入队列中,然后消费者才能将它们取出。
这是预期的吗?
await queue.put(item)
如果我遵循with ,我会得到我正在寻找的结果await asyncio.sleep(0)
。
生产者然后将 1 个项目放入队列,消费者然后从队列中取出 1 个项目。
我在 Python 3.6.8 和 3.7.2 中得到了相同的结果。
import asyncio
async def produce(queue, n):
for x in range(1, n + 1):
print('producing {}/{}'.format(x, n))
item = str(x)
await queue.put(item)
# await asyncio.sleep(0)
await queue.put(None)
async def consume(queue):
while True:
item = await queue.get()
if item is None:
break
print('consuming item {}...'.format(item))
loop = asyncio.get_event_loop()
queue = asyncio.Queue(loop=loop)
producer_coro = produce(queue, 10)
consumer_coro = consume(queue)
loop.run_until_complete(asyncio.gather(producer_coro, consumer_coro))
loop.close()
解决方案
这导致生产者将其所有项目放入队列中,然后消费者才能将它们取出。这是预期的吗?
是的。问题是您的队列是无界的,因此将某些内容放入其中永远不会暂停生产者,因此永远不会屈服于其他协程。所有立即提供数据的等待也是如此,例如在 EOF 处读取。
如果生产者的循环包含另一个暂停源,例如等待实际输入(毕竟它必须从某个地方获取项目),那么这将导致它暂停并且问题不会立即引起注意。强制暂停 usingasyncio.sleep(0)
也可以,但它很脆弱,因为它依赖于单个暂停来运行消费者。情况可能并非总是如此,因为消费者本身可能会等待队列以外的某些事件。
无界队列在某些情况下是有意义的,例如当队列预先填充了任务时,或者生产者的架构将项目的数量限制在合理的数量。但是如果队列项是动态生成的,最好加一个bound。界限保证了生产者的背压,并确保它不会垄断事件循环。
推荐阅读
- prestashop - 使用额外字段保存产品时出错 {“step1”:[“此表单不应包含额外字段。”]} prestashop 1.7.5
- web - 当我刷新页面时,我的待办事项列表消失了
- beautifulsoup - BeautifulSoup - .string 为空
- networking - 如何在网络上选择具有最大或最小中心度的海龟并为其分配特定状态
- java - SpringBoot:如何通过 RestTemplate HttpMethod GET 发送对象?
- git - 当我将功能分支重新设置为主分支与将主分支重新设置为功能时有什么区别,以及何时使用它们?
- excel - 从 Outlook 导入约会
- spring-boot - Zuul 网关总是返回状态 200 - docker compose
- wordpress - Wordpress - 插入 ACF php
- asynchronous - Celery 任务中的同步 IO 功能?