首页 > 解决方案 > 在 KeyboardInterrupt 上关闭异步循环 - 运行停止例程

问题描述

我正在使用 python 创建一个脚本,该脚本同时运行并与一些进程交互。为此,我使用 asyncio 来实现这种并行性。主要问题是如何在 KeyboardInterrupt 或 SIGINT 发生时运行另一个清理例程。

这是我为显示问题而编写的示例代码:

import asyncio
import logging
import signal
from time import sleep


class Process:
    async def start(self, arguments):
        self._process = await asyncio.create_subprocess_exec("/bin/bash", *arguments)

        return await self._process.wait()

    async def stop(self):
        self._process.terminate()


class BackgroundTask:

    async def start(self):
        # Very important process which needs to run while process 2 is running
        self._process1 = Process()
        self._process1_task = asyncio.create_task(self._process1.start(["-c", "sleep 100"]))

        self._process2 = Process()
        self._process2_task = asyncio.create_task(self._process2.start(["-c", "sleep 50"]))

        await asyncio.wait([self._process1_task, self._process2_task], return_when=asyncio.ALL_COMPLETED)

    async def stop(self):
        # Stop process
        await self._process1.stop()

        # Call a cleanup process which cleans up process 1
        cleanup_process = Process()
        await cleanup_process.start(["-c", "sleep 10"])

        # After that we can stop our second process
        await self._process2.stop()


backgroundTask = BackgroundTask()


async def main():
    await asyncio.create_task(backgroundTask.start())


logging.basicConfig(level=logging.DEBUG)
asyncio.run(main(), debug=True)

此代码创建一个后台任务,它启动两个进程(在本例中为两个 bash sleep 命令)并等待它们完成。这工作正常,两个命令并行运行。

主要问题是停止程序。我想stop在程序收到 SIGINT 或 KeyboardInterrupt 时运行该方法,它首先停止 process1,然后启动清理方法,然后停止 process2。这是必要的,因为清理命令依赖于 process2。

我尝试过的(而不是 asyncio.run() 和 async main):

def main():
    try:
        asyncio.get_event_loop().run_until_complete(backgroundTask.start())
    except KeyboardInterrupt:
        asyncio.get_event_loop().run_until_complete(backgroundTask.stop())

main()

这当然不能按预期工作,因为一旦发生 KeyboardInterrupt 异常,就会取消 backgroundTask.start 任务并在主循环中启动 backgroundTask.stop,因此我的进程被取消并且无法正确停止。

那么有没有办法在不取消当前主循环的情况下检测 KeyboardInterrupt 并运行我的 backgroundTask.stop 方法呢?

标签: pythonpython-asynciopython-3.7

解决方案


您想添加一个信号处理程序,如文档中的此示例所示:

import asyncio
import functools
import os
import signal

def ask_exit(signame, loop):
    print("got signal %s: exit" % signame)
    loop.stop()

async def main():
    loop = asyncio.get_running_loop()

    for signame in {'SIGINT', 'SIGTERM'}:
        loop.add_signal_handler(
            getattr(signal, signame),
            functools.partial(ask_exit, signame, loop))

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")
print(f"pid {os.getpid()}: send SIGINT or SIGTERM to exit.")

asyncio.run(main())

不过,这有点过于复杂/过时的示例,请考虑更像这样(您的协程代码位于asyncio.sleep调用所在的位置):

import asyncio
from signal import SIGINT, SIGTERM
    

async def main():
    loop = asyncio.get_running_loop()
    for signal_enum in [SIGINT, SIGTERM]:
        loop.add_signal_handler(signal_enum, loop.stop)

    await asyncio.sleep(3600) # Your code here


asyncio.run(main())

此时 Ctrl + C 将打破循环并引发 a RuntimeError,您可以通过将asyncio.run调用放在try/except块中来捕获它,如下所示:

try:
    asyncio.run(main())
except RuntimeError as exc:
    expected_msg = "Event loop stopped before Future completed."
    if exc.args and exc.args[0] == expected_msg:
        print("Bye")
    else:
        raise

但这并不是很令人满意(如果其他原因导致相同的错误怎么办?),所以我更愿意提出一个明显的错误。此外,如果您在命令行上退出,正确的做法是返回正确的退出代码(实际上,示例中的代码只是使用名称,但实际上是一个IntEnum带有数字退出代码的代码! )

import asyncio
from functools import partial
from signal import SIGINT, SIGTERM
from sys import stderr

class SignalHaltError(SystemExit):
    def __init__(self, signal_enum):
        self.signal_enum = signal_enum
        print(repr(self), file=stderr)
        super().__init__(self.exit_code)

    @property
    def exit_code(self):
        return self.signal_enum.value

    def __repr__(self):
        return f"\nExitted due to {self.signal_enum.name}"

def immediate_exit(signal_enum, loop):
    loop.stop()
    raise SignalHaltError(signal_enum=signal_enum)

async def main():
    loop = asyncio.get_running_loop()

    for signal_enum in [SIGINT, SIGTERM]:
        exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
        loop.add_signal_handler(signal_enum, exit_func)

    await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")

asyncio.run(main())

当 Ctrl + C'd out of 给出:

python cancelling_original.py

⇣</p>

Event loop running for 1 hour, press Ctrl+C to interrupt.
^C
Exitted due to SIGINT
echo $?

⇣</p>

2

现在有一些我很乐意提供的代码!:^)

PS这里是类型注释:

from __future__ import annotations

import asyncio
from asyncio.events import AbstractEventLoop
from functools import partial
from signal import Signals, SIGINT, SIGTERM
from sys import stderr
from typing import Coroutine

class SignalHaltError(SystemExit):
    def __init__(self, signal_enum: Signals):
        self.signal_enum = signal_enum
        print(repr(self), file=stderr)
        super().__init__(self.exit_code)

    @property
    def exit_code(self) -> int:
        return self.signal_enum.value

    def __repr__(self) -> str:
        return f"\nExitted due to {self.signal_enum.name}"

def immediate_exit(signal_enum: Signals, loop: AbstractEventLoop) -> None:
    loop.stop()
    raise SignalHaltError(signal_enum=signal_enum)

async def main() -> Coroutine:
    loop = asyncio.get_running_loop()

    for signal_enum in [SIGINT, SIGTERM]:
        exit_func = partial(immediate_exit, signal_enum=signal_enum, loop=loop)
        loop.add_signal_handler(signal_enum, exit_func)

    return await asyncio.sleep(3600)

print("Event loop running for 1 hour, press Ctrl+C to interrupt.")

asyncio.run(main())

此处自定义异常的优点是您可以专门捕获它,并避免将回溯转储到屏幕上

try:
    asyncio.run(main())
except SignalHaltError as exc:
    # log.debug(exc)
    pass
else:
    raise

推荐阅读