python - 关闭无限异步生成器
问题描述
可重现的错误
我试图在此处的在线 REPL 中重现该错误。async for response in position_stream()
但是,它与我的真实代码(我在其中,而不是for position in count()
在 REPL 中)的实现(因此行为)并不完全相同。
关于我的实际实施的更多细节
我在某处定义了一个协程,如下所示:
async def position(self):
request = telemetry_pb2.SubscribePositionRequest()
position_stream = self._stub.SubscribePosition(request)
try:
async for response in position_stream:
yield Position.translate_from_rpc(response)
finally:
position_stream.cancel()
其中 position_stream 是无限的(或可能非常持久)。我从这样的示例代码中使用它:
async def print_altitude():
async for position in drone.telemetry.position():
print(f"Altitude: {position.relative_altitude_m}")
并print_altitude()
在循环中运行:
asyncio.ensure_future(print_altitude())
asyncio.get_event_loop().run_forever()
效果很好。现在,在某个时候,我想关闭来自调用者的流。我以为我可以运行 asyncio.ensure_future(loop.shutdown_asyncgens())
并等待我的finally
close 上面被调用,但它没有发生。
相反,我收到有关未检索异常的警告:
Task exception was never retrieved
future: <Task finished coro=<print_altitude() done, defined at [...]
为什么会这样,我怎样才能使我的所有异步生成器实际上都关闭(并运行它们的finally
子句)?
解决方案
首先,如果你stop
是一个循环,你的协程都没有机会正常关闭。调用close
基本上意味着不可逆转地破坏循环。
如果你不关心那些正在运行的任务会发生什么,你可以简单地cancel
把它们都简单化,这也会停止异步生成器:
import asyncio
from contextlib import suppress
async def position_stream():
while True:
await asyncio.sleep(1)
yield 0
async def print_position():
async for position in position_stream():
print(f'position: {position}')
async def cleanup_awaiter():
await asyncio.sleep(3)
print('cleanup!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(print_position())
asyncio.ensure_future(print_position())
loop.run_until_complete(cleanup_awaiter())
# get all running tasks:
tasks = asyncio.gather(*asyncio.Task.all_tasks())
# schedule throwing CancelledError into the them:
tasks.cancel()
# allow them to process the exception and be cancelled:
with suppress(asyncio.CancelledError):
loop.run_until_complete(tasks)
finally:
print('closing loop')
loop.close()
推荐阅读
- python - 为什么我在使用 boto ecr 客户端登录后拒绝推送 docker?
- c# - 无法对视图中的空引用执行运行时绑定
- javascript - React Native - 从 Child 调用 Parent ref 函数
- ansible - ansible“取消归档”模块未按预期工作
- angular - 如何每 30 秒刷新一次 observable?
- asp.net-web-api - 将请求正文中使用 JSON 发送的模型属性与绑定其默认值的属性区分开来
- ios - 更改 navigationItem.leftBarButtonItem 的位置
- amazon-web-services - 适用于 HTTP NGINX 的 AWS EC2 安全组自定义 TCP 规则不起作用
- javascript - 如何在 javascript 中设置音频 blob 的编解码器、采样率和比特率?
- c# - 将第一个孩子与 linq 动态进行比较