首页 > 解决方案 > 将 async-for 与 if 条件结合以中断中间等待的正确方法是什么?

问题描述

如果我有一个协程正在消耗来自异步生成器的项目,那么从外部条件终止该循环的“最佳”方法是什么?

考虑到这一点,

while not self.shutdown_event.is_set():
    async with self.external_lib_client as client:
        async for message in client:
            if self.shutdown_event.is_set():
                break
            await self.handle(message)

如果我设置shutdown_event它会跳出while循环,但直到下一个messageasync for循环处理。构造async for迭代器的正确方法是什么,如果在产生结果之间满足条件,它可以短路?

有没有标准的方法来添加一个Timeout

标签: pythonpython-3.xasynchronouspython-asyncio

解决方案


一种方法是将迭代移动到 anasync def并使用取消:

async def iterate(client):
    async for message in client:
        # shield() because we want cancelation to cancel retrieval
        # of the next message, not ongoing handling of a message
        await asyncio.shield(self.handle(message))

async with self.external_lib_client as client:
    iter_task = asyncio.create_task(iterate(client))
    shutdown_task = asyncio.create_task(self.shutdown_event.wait())
    await asyncio.wait([iter_task, shutdown_task],
                       return_when=asyncio.FIRST_COMPLETED)
    if iter_task.done():
        # iteration has completed, access result to propagate the
        # exception if one was raised
        iter_task.result()
        shutdown_task.cancel()
    else:
        # shutdown was requested, cancel iteration
        iter_task.cancel()

另一种方法是变成shutdown_event一次性异步流并使用aiostream来监控两者。这样,for当关闭事件发出信号时,循环会获取一个对象,并且可以跳出循环而无需费心完成等待下一条消息:

# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())

async with self.external_lib_client as client, \
        aiostream.stream.merge(done_stream, client).stream() as stream:
    async for message in stream:
        # the merged stream will provide a bogus value (whatever
        # `shutdown_event.wait()` returned) when the event is set,
        # so check that before using `message`:
        if self.shutdown_event.is_set():
            break
        await self.handle(message)

注意:由于问题中的代码不可运行,以上示例未经测试。


推荐阅读