首页 > 解决方案 > 在单个任务中捕获异常并重新启动它们

问题描述

asyncio如果我在顶级类中创建一系列任务,所有这些基本上都应该永远运行,如下所示:

asyncio.create_task(...)
asyncio.create_task(...)
asyncio.create_task(...)
...

self.event_loop.run_forever()

# Once we fall out of the event loop, collect all remaining tasks,
# cancel them, and terminate the asyncio event loop
tasks = asyncio.Task.all_tasks()
group = asyncio.gather(*tasks, return_exceptions=True)
group.cancel()
self.event_loop.run_until_complete(group)
self.event_loop.close()

上面的代码不能处理以下情况,我发现我需要越来越多,而且我在谷歌搜索或asyncio文档中没有看到示例:

如果其中一项任务因异常而失败,则不会处理该异常 - 所有其他任务都会继续进行,但该任务只是静默停止(异常输出除外)。

那么,我该如何:

标签: pythonpython-3.xpython-asyncio

解决方案


未捕获的异常附加到任务对象,并且可以通过Task.exception()方法从中检索。该asyncio.create_task(...)调用返回任务对象,因此您需要收集这些对象以检查异常。

如果您想在发生异常时重新安排任务,那么您希望在新任务中执行此操作(因为您希望它在事件循环中运行),或者使用捕获异常的包装协程并再次重新运行给定的协程。

后者可能看起来像:

import traceback

async def rerun_on_exception(coro, *args, **kwargs):
    while True:
        try:
            await coro(*args, **kwargs)
        except asyncio.CancelledError:
            # don't interfere with cancellations
            raise
        except Exception:
            print("Caught exception")
            traceback.print_exc()

然后在将它们安排为任务时用上面的协程包装你的协程:

asyncio.create_task(rerun_on_exception(coroutine_uncalled, arg1value, ... kwarg1=value, ...)

例如,每次出现异常时传入参数以创建协程。

另一种选择是asyncio.wait()在单独的任务中使用,这样您就可以在循环运行时监控异常,然后决定如何处理异常:

async def def exception_aware_scheduler(*task_definitions):
    tasks = {
        asyncio.create_task(coro(*args, **kwargs)): (coro, args, kwargs)
        for coro, args, kwargs in task_definitions
    }
    while tasks:
        done, pending = await asyncio.wait(
            tasks.keys(), return_when=asyncio.FIRST_EXCEPTION
        )
        for task in done:
            if task.exception() is not None:
                print('Task exited with exception:')
                task.print_stack()
                print('Rescheduling the task\n')
                coro, args, kwargs = tasks.pop(task)
                tasks[asyncio.create_task(coro(*args, **kwargs))] = coro, args, kwargs

asyncio.wait()任何一个计划的任务由于异常退出时,事件循环再次赋予调用控制权,但在此之前,任务可能已被取消或简单地完成它们的工作。当任务因异常退出时,您需要一种方法来再次创建相同的协程(使用相同的参数),因此*args, **kwargs上面的设置。

您只需安排exception_aware_scheduler(), 传递您想要传递的任务:

task_definitions = (
    (coro1, (), {}),  # no arguments
    (coro2, ('arg1', 'arg2'), {}),
    # ...
)
asyncio.create_task(exception_aware_scheduler(*task_definitions))

推荐阅读