python - 在完成所有任务之前关闭 ProcessPoolExecutor 的异步
问题描述
我想将 ProcessPoolExecutor 与 asyncio 结合起来在 TestClass 中同时运行我的阻塞函数。每个任务都旨在长时间运行,因此我需要一个有效的关闭过程以在退出脚本后使事情顺利进行。我需要在哪里为 KeyboardInterrupt 添加错误处理以顺利关闭所有任务和进程?我搜索了很多相关的主题,但没有一个能解决我想要的问题。希望能得到一些帮助!提前致谢。
import asyncio
from concurrent.futures import ProcessPoolExecutor
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
async def task(loop,executor_processes, i):
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes,TestClass)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
if __name__ == '__main__':
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
finally:
print('Program finished')
以下是在所有任务和进程完成之前按 ctrl + c 后的错误日志。
Fatal Python error: Fatal Python error: init_import_sizeinit_import_size: : Failed to import the site moduleFailed to import the site module
Python runtime state: Python runtime state: initializedinitialized
Traceback (most recent call last):
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
Fatal Python error: init_import_size: Failed to import the site module
Python runtime state: initialized
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 580, in <module>
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
main()
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
main()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 563, in main
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
addsitepackages(known_paths, [sys.prefix])
addsitedir(sitedir, known_paths)
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
known_paths = venv(known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 495, in venv
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitedir(sitedir, known_paths)
exec(line)
exec(line)
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "<string>", line 1, in <module>
File "<string>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 2, in <module>
exec(line)
File "<string>", line 1, in <module>
from contextlib import contextmanager
from . import abc
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 6, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/abc.py", line 4, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 169, in addpackage
addsitepackages(known_paths, [sys.prefix])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 350, in addsitepackages
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 165, in <module>
exec(line)
File "<string>", line 1, in <module>
addsitedir(sitedir, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 208, in addsitedir
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/importlib/util.py", line 14, in <module>
from contextlib import contextmanager
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/contextlib.py", line 5, in <module>
addpackage(sitedir, name, known_paths)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/site.py", line 160, in addpackage
f = io.TextIOWrapper(io.open_code(fullname))
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
KeyboardInterrupt
from . import machinery
KeyboardInterrupt
from functools import wraps
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/functools.py", line 438, in <module>
from collections import deque
class _AsyncGeneratorContextManager(_GeneratorContextManagerBase,
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 21, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/abc.py", line 85, in __new__
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
KeyboardInterrupt
from operator import itemgetter as _itemgetter, eq as _eq
File "<frozen importlib._bootstrap>", line 991, in _find_and_load
File "<frozen importlib._bootstrap>", line 975, in _find_and_load_unlocked
File "<frozen importlib._bootstrap>", line 671, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 779, in exec_module
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/collections/__init__.py", line 394, in namedtuple
File "<frozen importlib._bootstrap_external>", line 911, in get_code
File "<frozen importlib._bootstrap_external>", line 580, in _compile_bytecode
Exception in thread QueueManagerThread:
Traceback (most recent call last):
KeyboardInterrupt
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 932, in _bootstrap_inner
exec(s, namespace)
File "<string>", line 1, in <module>
ctrl + c
Program finished
KeyboardInterrupt
self.run()
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/threading.py", line 870, in run
self._target(*self._args, **self._kwargs)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/process.py", line 394, in _queue_management_worker
work_item.future.set_exception(bpe)
File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/concurrent/futures/_base.py", line 539, in set_exception
raise InvalidStateError('{}: {!r}'.format(self._state, self))
concurrent.futures._base.InvalidStateError: CANCELLED: <Future at 0x7ffed1f2f250 state=cancelled>
解决方案
视窗解决方案
如果您在 Windows 上运行,那么 CTRL-C 中断处理似乎不适用于多处理池。以下内容有点笨拙,但似乎很流行。
ctrl_c_entered
这个想法是用一个初始设置为的全局变量来初始化多处理池中的每个进程False
。我已经TestClass
用一个方法完成了你的课程,该方法foo
将是调用的工作函数。调用时必须:
- 测试全局标志
ctrl_c_entered
,如果为真,则立即返回。 - 有自己的 KeyboardInterrupt 处理程序,在这样的中断上,它必须将全局
ctrl_c_entered
标志设置为 True 并返回。 - 更新:但是,当池进程尚未将控制权转移到工作函数时,可以输入 CTRL-C。例如,它可能正在从输入队列中获取下一个要运行的任务。在这种情况下,否则不会有
try/catch
有效的 KeyboardInterrupt 异常。因此,我们需要为池中将ctrl_c_entered
标志设置为 的每个进程的 SIGINT 中断设置一个中断处理程序True
。但这现在意味着必须在上面的步骤 2 中临时恢复原始的默认SIGINT
中断处理程序,以便捕获 KeyboardInterrupt 异常。
您还必须让所有提交的异步任务完成。所以我们设置了一个signal.SIGINT
中断处理程序,它ctrl_c_entered
为主进程设置一个全局标志,True
如果输入了 CTRL-C(我们不会跳出asyncio.run(main()
语句。我们长时间运行的 asyncio 任务必须检查这个ctrl_c_entered
标志并在它设置为时终止真的。
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
from functools import wraps
def handle_ctrl_c(func):
@wraps(func)
def wrapper(*args, **kwargs):
global ctrl_c_entered
if not ctrl_c_entered:
signal.signal(signal.SIGINT, default_sigint_handler) # the default
try:
return func(*args, **kwargs)
except KeyboardInterrupt:
ctrl_c_entered = True
return KeyboardInterrupt()
finally:
signal.signal(signal.SIGINT, pool_ctrl_c_handler)
else:
return KeyboardInterrupt()
return wrapper
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
@handle_ctrl_c
def foo(self, i):
time.sleep(1)
return i ** 2
async def task(loop, executor_processes, i):
# If this is a long-running task, periodically check running flag and return if set.
# For example:
if ctrl_c_entered:
return KeyboardInterrupt()
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo, i)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
return new_test
def pool_ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
def init_pool():
# set global variable for each process in the pool:
global ctrl_c_entered
global default_sigint_handler
ctrl_c_entered = False
default_sigint_handler = signal.signal(signal.SIGINT, pool_ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
results = await asyncio.gather(*tasks)
print(results)
def ctrl_c_handler(*args, **kwargs):
global ctrl_c_entered
ctrl_c_entered = True
if __name__ == '__main__':
ctrl_c_entered = False
signal.signal(signal.SIGINT, ctrl_c_handler)
asyncio.run(main())
print('Program finished')
印刷:
[TASK 1] Initializing Abck class
[TASK 2] Initializing Abck class
[TASK 3] Initializing Abck class
[TASK 4] Initializing Abck class
[TASK 5] Initializing Abck class
[TASK 6] Initializing Abck class
[TASK 7] Initializing Abck class
[TASK 8] Initializing Abck class
[TASK 9] Initializing Abck class
[TASK 10] Initializing Abck class
[TASK 11] Initializing Abck class
[TASK 12] Initializing Abck class
[TASK 13] Initializing Abck class
[TASK 14] Initializing Abck class
[TASK 15] Initializing Abck class
[TASK 16] Initializing Abck class
[TASK 17] Initializing Abck class
[TASK 18] Initializing Abck class
[TASK 19] Initializing Abck class
[TASK 1] Finished
[TASK 2] Finished
[TASK 3] Finished
[TASK 4] Finished
[TASK 5] Finished
[TASK 6] Finished
[TASK 7] Finished
[TASK 9] Finished
[TASK 8] Finished
[TASK 10] Finished
ctrl + c
ctrl + c
ctrl + c
ctrl + c
ctrl + c
[TASK 13] Finished
[TASK 16] Finished
[TASK 17] Finished
[TASK 18] Finished
[TASK 19] Finished
[TASK 14] Finished
[TASK 12] Finished
[TASK 11] Finished
[TASK 15] Finished
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt(), KeyboardInterrupt()]
使用 Fork 解决方案的 Linux 和平台
这更简单,因为中断处理或多或少适用于多处理池。处理这个问题的最简单方法是再次running
为每个池进程初始化一个全局标志,工作函数可以定期检查并终止 if False
。每个池进程将设置一个 CTRL-C 处理程序,并running
在用户输入 CTRL-C 时设置为 False。这将负责终止任何已经运行的任务。主进程可以简单地处理KeyboardInterrupt
异常:
import asyncio
from concurrent.futures import ProcessPoolExecutor
import signal
import time
class TestClass:
def __init__(self) -> None:
self.value1 = 1
self.value2 = 2
def foo(self):
for _ in range(20):
if not running:
return
time.sleep(.1)
async def task(loop, executor_processes, i):
print(f"[TASK {i}] Initializing Abck class")
new_test = await loop.run_in_executor(executor_processes, TestClass().foo)
# other async and sync functions contained in TestClass
print(f"[TASK {i}] Finished")
def ctrl_c_handler(*args, **kwargs):
global running
running = False
def init_pool():
global running
running = True
signal.signal(signal.SIGINT, ctrl_c_handler)
async def main():
executor_processes = ProcessPoolExecutor(max_workers=5, initializer=init_pool)
loop_ = asyncio.get_event_loop()
tasks = []
for i in range(1, 100):
tasks.append(task(loop_, executor_processes, i))
await asyncio.gather(*tasks)
try:
asyncio.run(main())
except KeyboardInterrupt:
print("ctrl + c")
print('Program finished')
推荐阅读
- php - 如何使用多维数组以及重叠检查从开始时间和结束时间获取总小时数?
- mysql - 使用 Axios 的 Mysql Express Reactjs Nodejs crud 问题
- arrays - how can i write this algorithm more efficiently ? in a way that reduce time complexity?
- azure - 在 azure 批处理池的基础映像上安装 KeyVaultExtension 以进行证书轮换
- java - 无法对对象类型 ArrayList 实现 Filterable
- java - 私有静态字段 Java
- algorithm - Boost Graph:遍历树到单叶的算法
- python - 无法让 Python 从同一目录中的文件导入类
- django - django 或 react ,我应该学习什么来进行全栈开发?
- python - pypy 默认包含哪些包