首页 > 解决方案 > 是否可以检测到所有异步任务何时暂停?

问题描述

我正在尝试测试异步代码,但由于某些任务之间的复杂连接,我遇到了麻烦。

我需要的上下文是一些代码,它在另一个进程写入文件的同时读取文件。代码中有一些逻辑,读取截断的记录会使它后退并wait()asyncio.Condition稍后由inotify事件释放。当未来的写入已由另一个进程完成时,此代码应通过重新读取记录来使其恢复。我特别想测试这种恢复是否有效。

所以我的计划是:


我以为这是分析器:Detect an idle asyncio event loop

然而,试验测试表明它退出得太早了:

import asyncio
import random


def test_ping_pong():
    async def ping_pong(idx: int, oth_idx: int):
        for i in range(random.randint(100, 1000)):
            counters[idx] += 1
            async with conditions[oth_idx]:
                conditions[oth_idx].notify()
            async with conditions[idx]:
                await conditions[idx].wait()

    async def detect_iowait():
        loop = asyncio.get_event_loop()
        rsock, wsock = socket.socketpair()
        wsock.close()
        try:
            await loop.sock_recv(rsock, 1)
        finally:
            rsock.close()
    conditions = [asyncio.Condition(), asyncio.Condition()]
    counters = [0, 0]


    loop = asyncio.get_event_loop()
    loop.create_task(ping_pong(0, 1))
    loop.create_task(ping_pong(1, 0))
    loop.run_until_complete(loop.create_task(detect_iowait()))

    assert counters[0] > 10
    assert counters[1] > 10

标签: pythonunit-testingpython-asyncio

解决方案


在挖掘了 python 事件循环的源代码之后,我没有发现任何可以公开执行此操作的公开内容。

然而,可以使用_readyBaseEventLoop. 见这里。这包含立即准备运行的每个任务。当任务运行时,它会从_ready双端队列中弹出。当一个挂起的任务被另一个任务释放时(例如通过调用future.set_result()),挂起的任务立即被添加回双端队列。这从 python 3.5 开始就存在了。

您可以做的一件事是反复注入回调以检查_ready. 当所有其他任务暂停时,回调运行时 dqueue 中将没有任何内容。

回调将在事件循环的每次迭代中最多运行一次:

async def wait_for_deadlock(empty_loop_threshold: int = 0):
    def check_for_deadlock():
        nonlocal empty_loop_count

        # pylint: disable=protected-access
        if loop._ready:
            empty_loop_count = 0
            loop.call_soon(check_for_deadlock)
        elif empty_loop_count < empty_loop_threshold:
            empty_loop_count += 1
            loop.call_soon(check_for_deadlock)
        else:
            future.set_result(None)

    empty_loop_count = 0
    loop = asyncio.get_running_loop()
    future = loop.create_future()
    asyncio.get_running_loop().call_soon(check_for_deadlock)
    await future

在上面的代码中,empty_loop_threshold在大多数情况下并不是真正需要的,但在任务与 IO 通信的情况下存在。例如,如果一个任务通过 IO 与另一个任务进行通信,则可能存在所有任务暂停的时刻,即使一个任务已准备好读取数据。设置empty_loop_threshold = 1应该解决这个问题。

使用它相对简单。你可以:

loop.run_until_complete(wait_for_deadlock())

或按我的问题要求:

def some_test():
    async def async_test():
        await wait_for_deadlock()
        inject_something()
        await wait_for_deadlock()
            

    loop = loop.get_event_loop()
    loop.create_task(task_to_test())
    loop.run_until_complete(loop.create_task(async_test)
    assert something

推荐阅读