python - 使用 asyncio 处理超时
问题描述
免责声明:这是我第一次尝试该asyncio
模块。
我正在使用asyncio.wait
以下方式尝试支持超时功能,以等待来自一组异步任务的所有结果。这是一个更大的库的一部分,所以我省略了一些不相关的代码。
请注意,该库已经支持通过 ThreadPoolExecutors 和 ProcessPoolExecutors 提交任务和使用超时,所以我对使用它们的建议或关于我为什么使用asyncio
. 上代码...
import asyncio
from contextlib import suppress
...
class AsyncIOSubmit(Node):
def get_results(self, futures, timeout=None):
loop = asyncio.get_event_loop()
finished, unfinished = loop.run_until_complete(
asyncio.wait(futures, timeout=timeout)
)
if timeout and unfinished:
# Code options in question would go here...see below.
raise asyncio.TimeoutError
起初我并不担心在超时时取消挂起的任务,但后来我Task was destroyed but it is pending!
在程序退出或loop.close
. 经过一番研究,我发现了多种取消任务并等待它们实际被取消的方法:
选项1:
[task.cancel() for task in unfinished]
for task in unfinished:
with suppress(asyncio.CancelledError):
loop.run_until_complete(task)
选项 2:
[task.cancel() for task in unfinished]
loop.run_until_complete(asyncio.wait(unfinished))
选项 3:
# Not really an option for me, since I'm not in an `async` method
# and don't want to make get_results an async method.
[task.cancel() for task in unfinished]
for task in unfinished:
await task
选项 4:
类似于this answer中的某种while循环。似乎我的其他选择更好,但包括完整性。
到目前为止,选项 1 和 2 似乎都可以正常工作。任何一种选择都可能是“正确的”,但随着asyncio
多年来的发展,网络上的示例和建议要么已经过时,要么变化很大。所以我的问题是...
问题 1
选项 1 和 2 之间有什么实际区别吗?我知道run_until_complete
会一直运行到未来完成,所以由于选项 1 以特定顺序循环,我想如果早期的任务需要更长的时间才能真正完成,它的行为可能会有所不同。我尝试查看 asyncio 源代码以了解是否asyncio.wait
只是有效地对其任务/未来做同样的事情,但这并不明显。
问题2
我假设如果其中一项任务处于长时间运行的阻塞操作的中间,它实际上可能不会立即取消?也许这仅取决于所使用的底层操作或库是否会立即引发 CancelledError?也许为 asyncio 设计的库永远不会发生这种情况?
由于我在这里尝试实现超时功能,因此我对此有些敏感。如果这些事情可能需要很长时间才能取消,我会考虑打电话cancel
而不是等待它实际发生,或者设置一个非常短的超时来等待取消完成。
问题 3
是否有可能loop.run_until_complete
(或者实际上,对 的底层调用)出于超时以外的原因async.wait
返回值?unfinished
如果是这样,我显然必须稍微调整一下我的逻辑,但从文档来看,这似乎是不可能的。
解决方案
选项 1 和 2 之间有什么实际区别吗?
不。选项 2 看起来更好,可能效率更高,但它们的净效果是相同的。
我知道
run_until_complete
会一直运行到未来完成,所以由于选项 1 以特定顺序循环,我想如果早期的任务需要更长的时间才能真正完成,它的行为可能会有所不同。
一开始似乎是这样,但实际上并非如此,因为loop.run_until_complete
运行提交给循环的所有任务,而不仅仅是作为参数传递的任务。它只是在提供的等待完成后停止- 这就是“运行直到完成”所指的内容。对已调度任务的循环调用run_until_complete
类似于以下异步代码:
ts = [asyncio.create_task(asyncio.sleep(i)) for i in range(1, 11)]
# takes 10s, not 55s
for t in ts:
await t
这又在语义上等效于以下线程代码:
ts = []
for i in range(1, 11):
t = threading.Thread(target=time.sleep, args=(i,))
t.start()
ts.append(t)
# takes 10s, not 55s
for t in ts:
t.join()
换句话说,await t
并run_until_complete(t)
阻止直到t
完成,但允许其他所有内容 - 例如先前计划使用asyncio.create_task()
的任务也可以在此期间运行。所以总运行时间将等于最长任务的运行时间,而不是它们的总和。例如,如果第一个任务恰好需要很长时间,那么所有其他任务都将在此期间完成,并且它们的 await 根本不会休眠。
所有这些仅适用于先前已安排的等待任务。如果您尝试将其应用于协程,它将不起作用:
# runs for 55s, as expected
for i in range(1, 11):
await asyncio.sleep(i)
# also 55s - we didn't call create_task() so it's equivalent to the above
ts = [asyncio.sleep(i) for i in range(1, 11)]
for t in ts:
await t
# also 55s
for i in range(1, 11):
t = threading.Thread(target=time.sleep, args=(i,))
t.start()
t.join()
对于 asyncio 初学者来说,这通常是一个症结所在,他们编写的代码与上一个 asyncio 示例等效,并希望它能够并行运行。
我尝试查看 asyncio 源代码以了解是否
asyncio.wait
只是有效地对其任务/未来做同样的事情,但这并不明显。
asyncio.wait
只是一个方便的 API,它做两件事:
- 将输入参数转换为实现
Future
. 对于协程,这意味着它将它们提交给事件循环,就像 with 一样create_task
,这允许它们独立运行。如果您一开始就给它任务,就像您做的那样,这一步将被跳过。 - 用于
add_done_callback
在期货完成时收到通知,此时它会恢复其调用者。
所以是的,它做同样的事情,但实现不同,因为它支持更多的特性。
我假设如果其中一项任务处于长时间运行的阻塞操作的中间,它实际上可能不会立即取消?
在 asyncio 中不应该有“阻塞”操作,只有那些暂停的操作,它们应该立即取消。例外情况是阻塞附加到 asyncio with 的代码run_in_executor
,其中底层操作根本不会取消,但 asyncio 协程将立即获得异常。
也许这仅取决于所使用的底层操作或库是否会立即引发 CancelledError?
库不会raise CancelledError
,它会在发生取消之前碰巧暂停的等待点接收它。对于图书馆来说,取消的效果是await ...
中断它的等待并立即提升CancelledError
。除非被捕获,否则异常将通过函数传播并await
一直调用到顶级协程,其引发CancelledError
将整个任务标记为已取消。表现良好的 asyncio 代码将做到这一点,可能finally
用于释放它们持有的操作系统级资源。当CancelledError
被捕获时,代码可以选择不重新引发它,在这种情况下,取消被有效地忽略。
是否有可能 loop.run_until_complete (或者实际上,对 的底层调用
async.wait
)由于超时以外的原因返回未完成的值?
如果您正在使用return_when=asyncio.ALL_COMPLETE
(默认),那应该是不可能的。很可能return_when=FIRST_COMPLETED
,那么显然可以独立于超时。
推荐阅读
- mysql - 比较数字与字母数字
- python - 如何导入模块并将其绑定到 tkinter 窗口中的按钮
- c# - 如何使用这个 Nuget 包 Bot.Builder.Community.Middleware.Typing?
- python-3.x - 在熊猫中添加新列,python min 函数显示错误
- shell - 捕获子进程ID并将其保存到文件中以杀死它
- java - if/if - else/ else 带 lambda 表达式
- java - 如何比较具有不同名称引用但具有相同实际数据的两个列表
- android - 我在我的 Android 回收站视图中没有看到 mp3 文件名,而是看到了他们的链接
- c++ - 尝试对大型已排序容器进行排序时,快速排序生成退出代码 -1073741571 (0xC00000FD)
- vba - 我的 MS ACCESS VBA 函数计算新的纬度/经度坐标的错误在哪里?