首页 > 解决方案 > 获取 asyncio.Queue 中的所有项目并返回它们

问题描述

你如何从一个asyncio.Queue实例中收集所有项目并将它们作为结果返回?

注意事项:

我为消费者看到的示例不必asyncio.Queue收集并返回结果;他们使用队列中的项目但没有返回值。他们依靠副作用来完成工作,而不关心返回结果。

具体来说,下面是一个简单的例子。我弄清楚如何完成这项工作的唯一方法是提供一个输出参数/输出参数,调用items该协程从队列中收集结果:

import asyncio
import random


async def add_queue_item(item, queue):
    # simulate some work
    sleep_interval = random.randint(0, 3)
    await asyncio.sleep(sleep_interval)
    output_item = item + 1
    await queue.put(output_item)


async def get_all_queue_items(queue, items):
    while True:
        items.append(await queue.get())
        queue.task_done()


async def main():
    queue = asyncio.Queue()
    items = []
    producer_tasks = [asyncio.create_task(add_queue_item(item, queue)) for item in range(5)]
    collect_queue_items_task = asyncio.create_task(get_all_queue_items(queue, items))
    await queue.join()
    await asyncio.gather(*producer_tasks)
    collect_queue_items_task.cancel()
    print(items)
    assert sorted(items) == [1, 2, 3, 4, 5]


asyncio.run(main())

有没有办法在get_all_queue_items上面实现,这样我们就可以await <something>得到所有的项目——明确什么是预期的?IE,

    …
    await queue.join()
    await asyncio.gather(*producer_tasks)
    items = await <something>
    print(items)
    assert sorted(items) == [1, 2, 3, 4, 5]

标签: pythonasynchronousqueuepython-asyncio

解决方案


我能够使用哨兵值获得一个实现,以提醒消费者get_all_queue_items队列中没有更多的值,从而将其从循环中断开。get_all_queue_items可以等待计划的任务并将收集的项目包含在其中。

import asyncio
import random


SENTINEL = object()


async def add_queue_item(item, queue):
    # simulate some work
    sleep_interval = random.randint(1, 3)
    await asyncio.sleep(sleep_interval)
    output_item = item + 1
    await queue.put(output_item)


async def get_all_queue_items(queue):
    items = []
    item = await queue.get()
    while item is not SENTINEL:
        items.append(item)
        queue.task_done()
        item = await queue.get()
    queue.task_done()
    return items


async def main():
    queue = asyncio.Queue()
    producer_tasks = [asyncio.create_task(add_queue_item(item, queue)) for item in range(5)]
    collect_queue_items_task = asyncio.create_task(get_all_queue_items(queue))
    await asyncio.gather(*producer_tasks)
    await queue.put(SENTINEL)
    await queue.join()
    items = await collect_queue_items_task
    print(items)
    assert sorted(items) == [1, 2, 3, 4, 5]


asyncio.run(main())

推荐阅读