首页 > 解决方案 > 块大小或超时异步python

问题描述

我试图找出一种简单的方法来解开队列,方法是获取指定chunk_sizetimeout的块。

例如,我希望get_chunks函数返回一个chunk_size项目的列表,如果它需要少于超时来获取它们,否则返回一个长度在 9 和chunk_size之间的列表。

这是到目前为止的代码:

import asyncio

async def populate(queue):
    for i in range(0, 100):
        await queue.put(i)

async def _get_chunks(queue, chunk_size):
    items = []
    for i in range(0, chunk_size):
        items.append(await queue.get())
        await asyncio.sleep(0.2)
    return items

async def get_chunks(queue, chunk_size, timeout):
    while True:
        yield _get_chunks(queue, chunk_size)

async def listen():
    queue = asyncio.Queue()
    await populate(queue)
    print(f'{queue.qsize()} items in queue')
    async for chunk in get_chunks(queue, 10, 1):
        print(await chunk)

def main():
    loop = asyncio.get_event_loop()
    loop.run_until_complete(listen())

if __name__ == '__main__':
    main()  

我认为有一种方法可以做到这asyncio.wait一点:

done, not_done = asyncio.wait([_get_chunks(queue, size),
                               asyncio.sleep(timeout)],
                              return_when=asyncio.FIRST_COMPLETE)
items = done.pop().result()

但我无法在asyncio.sleep第一次返回时得到结果。

标签: python-3.xpython-asyncio

解决方案


您无法获得结果,因为_get_chunks尚未完成。一个简单的解决方法是在和它的调用者之间有一些共享状态_get_chunks

async def _get_chunks(queue, chunk_size, out):
    for i in range(0, chunk_size):
        out.append(await queue.get())
        await asyncio.sleep(0.2)

然后你可以使用 实现超时wait_for,它会自动取消超时协程:

items = []
try:
    asyncio.wait_for(_get_chunk(queue, size, items))
except asyncio.TimeoutError:
    pass
# items now contains the elements _get_chunk managed to extract
# from the queue within the alotted time

推荐阅读