python - 是否可以检测到所有异步任务何时暂停?
问题描述
我正在尝试测试异步代码,但由于某些任务之间的复杂连接,我遇到了麻烦。
我需要的上下文是一些代码,它在另一个进程写入文件的同时读取文件。代码中有一些逻辑,读取截断的记录会使它后退并wait()
在asyncio.Condition
稍后由inotify事件释放。当未来的写入已由另一个进程完成时,此代码应通过重新读取记录来使其恢复。我特别想测试这种恢复是否有效。
所以我的计划是:
- 写一个部分文件
- 运行事件循环,直到它在条件下挂起
- 写文件的其余部分
- 运行事件循环完成
我以为这是分析器:Detect an idle asyncio event loop
然而,试验测试表明它退出得太早了:
import asyncio
import random
def test_ping_pong():
async def ping_pong(idx: int, oth_idx: int):
for i in range(random.randint(100, 1000)):
counters[idx] += 1
async with conditions[oth_idx]:
conditions[oth_idx].notify()
async with conditions[idx]:
await conditions[idx].wait()
async def detect_iowait():
loop = asyncio.get_event_loop()
rsock, wsock = socket.socketpair()
wsock.close()
try:
await loop.sock_recv(rsock, 1)
finally:
rsock.close()
conditions = [asyncio.Condition(), asyncio.Condition()]
counters = [0, 0]
loop = asyncio.get_event_loop()
loop.create_task(ping_pong(0, 1))
loop.create_task(ping_pong(1, 0))
loop.run_until_complete(loop.create_task(detect_iowait()))
assert counters[0] > 10
assert counters[1] > 10
解决方案
在挖掘了 python 事件循环的源代码之后,我没有发现任何可以公开执行此操作的公开内容。
然而,可以使用_ready
由BaseEventLoop
. 见这里。这包含立即准备运行的每个任务。当任务运行时,它会从_ready
双端队列中弹出。当一个挂起的任务被另一个任务释放时(例如通过调用future.set_result()),挂起的任务立即被添加回双端队列。这从 python 3.5 开始就存在了。
您可以做的一件事是反复注入回调以检查_ready
. 当所有其他任务暂停时,回调运行时 dqueue 中将没有任何内容。
回调将在事件循环的每次迭代中最多运行一次:
async def wait_for_deadlock(empty_loop_threshold: int = 0):
def check_for_deadlock():
nonlocal empty_loop_count
# pylint: disable=protected-access
if loop._ready:
empty_loop_count = 0
loop.call_soon(check_for_deadlock)
elif empty_loop_count < empty_loop_threshold:
empty_loop_count += 1
loop.call_soon(check_for_deadlock)
else:
future.set_result(None)
empty_loop_count = 0
loop = asyncio.get_running_loop()
future = loop.create_future()
asyncio.get_running_loop().call_soon(check_for_deadlock)
await future
在上面的代码中,empty_loop_threshold
在大多数情况下并不是真正需要的,但在任务与 IO 通信的情况下存在。例如,如果一个任务通过 IO 与另一个任务进行通信,则可能存在所有任务暂停的时刻,即使一个任务已准备好读取数据。设置empty_loop_threshold = 1
应该解决这个问题。
使用它相对简单。你可以:
loop.run_until_complete(wait_for_deadlock())
或按我的问题要求:
def some_test():
async def async_test():
await wait_for_deadlock()
inject_something()
await wait_for_deadlock()
loop = loop.get_event_loop()
loop.create_task(task_to_test())
loop.run_until_complete(loop.create_task(async_test)
assert something
推荐阅读
- java - 出现意外错误(类型=内部服务器错误,状态=500)。org/eclipse/jdt/internal/compiler/env/INameEnvironment
- java - 套接字编程,尝试连接到服务器会冻结应用程序和风车
- sql-server - 在 sqlserver 中比较 Varchar 时出现问题。使用 = 不起作用
- css - Css flexbox 条件包装
- visual-studio-code - 如何添加/编辑上下文菜单
- php - 从 html 中查找所有单词(或句子)
- python - Postgres 数据库安全性:在环境变量中存储什么?
- c++ - 是否可以编译具有在编译时无法解决的外部依赖关系的静态库?
- android - Crashlytics.logException 方法抛出非法状态异常。无法收集某些活动的非致命问题
- snmp - snmptrap v3 错误“在 TRAP2 PDU 中找不到 TrapOID”