首页 > 解决方案 > 有没有比使用 asyncio 的无限循环更好的方法来捕获“KeyboardInterrupt”?

问题描述

我正在开发的程序在另一个线程中有一个长时间运行的进程。如果出现问题,我想中断该线程。

在我见过的其他 SO 帖子中,它们使用的语法类似于:

while True:
    if condition_here:
        break
    else:
        await asyncio.sleep(1)

这确实有效KeyboardInterrupts。但是,我不喜欢使用这样的 while 循环,并且希望尽可能避免这种情况。

对于一些示例代码,这是我目前拥有的(直到线程完成后才捕获中断):

import asyncio
import time
from threading import Thread

def some_long_process():
    time.sleep(60)

async def main():
    thread = Thread(target=some_long_process)
    thread.start()

    # Doesn't work
    loop = asyncio.get_event_loop()
    await loop.run_in_executor(None, thread.join)
    # await asyncio.wait([loop.run_in_executor(None, thread.join)])
    # await asyncio.wait_for(loop.run_in_executor(None, thread.join), None)
    # await asyncio.gather(asyncio.to_thread(thread.join))

    # Works
    # while thread.is_alive():
    #     await asyncio.sleep(1)

if __name__ == '__main__':
    asyncio.run(main())

如果这是不可能的,我也愿意接受重新考虑我的整个设计方法的建议。谢谢你的时间。

标签: pythonpython-asyncio

解决方案


推荐阅读