首页 > 解决方案 > 异步任务取消

问题描述

这是我为更好地理解任务取消而创建的测试脚本 -

import asyncio
import random
import signal
import traceback

async def shutdown(signame, loop):
    print("Shutting down")
    tasks = [task for task in asyncio.Task.all_tasks()]
    for task in tasks:
        task.cancel()
        try:
            await task
        except asyncio.CancelledError:
            print("Task cancelled: %s", task)   
    loop.stop()

async def another():
    await asyncio.sleep(2)

async def some_other_process():
    await asyncio.sleep(5)
    return "Me"

async def process(job, loop, i):
    print(i)
    task = loop.create_task(some_other_process())
    value = await task

    if i < 1:
        another_task = loop.create_task(another())
        await another_task
    # await some_other_process()

def pull(loop):
    i = 0
    while True:
        job = f"random-integer-{random.randint(0, 100)}"
        try:
            loop.run_until_complete(process(job, loop, i))
            i += 1
        except asyncio.CancelledError as e:
            print("Task cancelled")
            break
        except Exception:
            print(traceback.format_exc())
    # asyncio.get_event_loop().stop()       


def main():
    try:
        loop = asyncio.get_event_loop()

        for signame in ['SIGINT']:
            loop.add_signal_handler(
                getattr(signal, signame),
                lambda: asyncio.ensure_future(shutdown(signame, loop))
            )

        try:
            pull(loop)
        except Exception:
            print(traceback.format_exc())
        finally:
            loop.close()
    finally:
        print("Done")

if __name__ == "__main__":
    main()

我不明白为什么我看到 -

Task was destroyed but it is pending!
task: <Task cancelling coro=<shutdown() done, defined at test.py:6>>

标签: pythonpython-asynciocancellation

解决方案


loop.add_signal_handler(
    getattr(signal, signame),
    lambda: asyncio.ensure_future(shutdown(signame, loop))
)

在这里使用asyncio.ensure_future您为协程创建任务shutdown,但您不会在任何地方等待该任务完成。稍后当您关闭事件循环时,它会警告您此任务正在等待处理。


更新:

如果你想做一些清理,最好的地方是在loop.close()你的脚本结束之前(信号、异常等)

尝试以这种方式更改您的代码:

# ...

async def shutdown(loop):  # remove `signal` arg

# ...

def main():
    try:
        loop = asyncio.get_event_loop()
        try:
            pull(loop)
        except Exception:
            print(traceback.format_exc())
        finally:
            loop.run_until_complete(shutdown(loop))  # just run until shutdown is done
            loop.close()
    finally:
        print("Done")

# ...

更新2:

如果您仍然需要信号处理程序,您可能想要执行以下操作:

from functools import partial

loop.add_signal_handler(
    getattr(signal, signame),
    partial(cb, signame, loop)
)

def cb(signame, loop):
    loop.stop()
    loop.run_until_complete(shutdown(signame, loop))

推荐阅读