python - 获取 asyncio.Queue 中的所有项目并返回它们
问题描述
你如何从一个asyncio.Queue
实例中收集所有项目并将它们作为结果返回?
注意事项:
- 放入队列的项目总数无法提前知道(但它是有限的,并且可以放入内存中)。
- 多个生产者可以将项目添加到队列中;生产者自己不一定知道所有项目何时都已添加到队列中。
我为消费者看到的示例不必asyncio.Queue
收集并返回结果;他们使用队列中的项目但没有返回值。他们依靠副作用来完成工作,而不关心返回结果。
具体来说,下面是一个简单的例子。我弄清楚如何完成这项工作的唯一方法是提供一个输出参数/输出参数,调用items
该协程从队列中收集结果:
import asyncio
import random
async def add_queue_item(item, queue):
# simulate some work
sleep_interval = random.randint(0, 3)
await asyncio.sleep(sleep_interval)
output_item = item + 1
await queue.put(output_item)
async def get_all_queue_items(queue, items):
while True:
items.append(await queue.get())
queue.task_done()
async def main():
queue = asyncio.Queue()
items = []
producer_tasks = [asyncio.create_task(add_queue_item(item, queue)) for item in range(5)]
collect_queue_items_task = asyncio.create_task(get_all_queue_items(queue, items))
await queue.join()
await asyncio.gather(*producer_tasks)
collect_queue_items_task.cancel()
print(items)
assert sorted(items) == [1, 2, 3, 4, 5]
asyncio.run(main())
有没有办法在get_all_queue_items
上面实现,这样我们就可以await <something>
得到所有的项目——明确什么是预期的?IE,
…
await queue.join()
await asyncio.gather(*producer_tasks)
items = await <something>
print(items)
assert sorted(items) == [1, 2, 3, 4, 5]
解决方案
我能够使用哨兵值获得一个实现,以提醒消费者get_all_queue_items
队列中没有更多的值,从而将其从循环中断开。get_all_queue_items
可以等待计划的任务并将收集的项目包含在其中。
import asyncio
import random
SENTINEL = object()
async def add_queue_item(item, queue):
# simulate some work
sleep_interval = random.randint(1, 3)
await asyncio.sleep(sleep_interval)
output_item = item + 1
await queue.put(output_item)
async def get_all_queue_items(queue):
items = []
item = await queue.get()
while item is not SENTINEL:
items.append(item)
queue.task_done()
item = await queue.get()
queue.task_done()
return items
async def main():
queue = asyncio.Queue()
producer_tasks = [asyncio.create_task(add_queue_item(item, queue)) for item in range(5)]
collect_queue_items_task = asyncio.create_task(get_all_queue_items(queue))
await asyncio.gather(*producer_tasks)
await queue.put(SENTINEL)
await queue.join()
items = await collect_queue_items_task
print(items)
assert sorted(items) == [1, 2, 3, 4, 5]
asyncio.run(main())
推荐阅读
- c# - C#:如何确定被触发的事件的名称及其事件/委托类型?
- r - 在 R 的循环输出中有额外的值
- arrays - 为什么以及何时在 Swift 中对 Array 使用惰性?
- reactjs - React Navigation - DrawerNavigator 中跨屏幕的通用组件
- wordpress - Woocoomerce - 将类添加到输入标签
- node.js - nodejs request.post 无法获取cookies,但是request.get 是可以的。
- apache - 使用 WampServer 拒绝访问网络上另一台计算机上的端口 443 (https://)
- r - 重新排序,排除一列并将其他列保留在 R 中?
- c# - WPF 双向绑定 DataGrid 中的 WinForms 控件
- java - Spring Boot Startup 卡在 - INFO: Using a shared selector for servlet write/read