python - 将 async-for 与 if 条件结合以中断中间等待的正确方法是什么?
问题描述
如果我有一个协程正在消耗来自异步生成器的项目,那么从外部条件终止该循环的“最佳”方法是什么?
考虑到这一点,
while not self.shutdown_event.is_set():
async with self.external_lib_client as client:
async for message in client:
if self.shutdown_event.is_set():
break
await self.handle(message)
如果我设置shutdown_event
它会跳出while循环,但直到下一个message
被async for
循环处理。构造async for
迭代器的正确方法是什么,如果在产生结果之间满足条件,它可以短路?
有没有标准的方法来添加一个Timeout
?
解决方案
一种方法是将迭代移动到 anasync def
并使用取消:
async def iterate(client):
async for message in client:
# shield() because we want cancelation to cancel retrieval
# of the next message, not ongoing handling of a message
await asyncio.shield(self.handle(message))
async with self.external_lib_client as client:
iter_task = asyncio.create_task(iterate(client))
shutdown_task = asyncio.create_task(self.shutdown_event.wait())
await asyncio.wait([iter_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED)
if iter_task.done():
# iteration has completed, access result to propagate the
# exception if one was raised
iter_task.result()
shutdown_task.cancel()
else:
# shutdown was requested, cancel iteration
iter_task.cancel()
另一种方法是变成shutdown_event
一次性异步流并使用aiostream来监控两者。这样,for
当关闭事件发出信号时,循环会获取一个对象,并且可以跳出循环而无需费心完成等待下一条消息:
# a stream that just yields something (the return value of `wait()`)
# when shutdown_event is set
done_stream = aiostream.stream.just(self.shutdown_event.wait())
async with self.external_lib_client as client, \
aiostream.stream.merge(done_stream, client).stream() as stream:
async for message in stream:
# the merged stream will provide a bogus value (whatever
# `shutdown_event.wait()` returned) when the event is set,
# so check that before using `message`:
if self.shutdown_event.is_set():
break
await self.handle(message)
注意:由于问题中的代码不可运行,以上示例未经测试。
推荐阅读
- c# - C# 中的 IPP 打印 - 有可能吗?
- angular - 如何将更改的对象作为主题发出并仅订阅一个键更改?
- flutter - Flutter 中热重载原生代码变更(插件开发)
- javascript - 从内部事件处理程序访问类/子类函数和属性
- c# - 如何创建随机数列表并确保 C# 彩票程序没有重复项?
- stm32 - STM32:未对齐的循环 DMA UART 缓冲区
- java - 对包含 groovy 类的包使用反射时,“org.reflections.Reflections - 无法获取名称类型”
- java - Gson没有序列化值
- python - 在 IBM Watson 中部署笔记本
- android - 使用 Powerapps 向我的应用程序添加间歇性徽标