python - 你如何让多处理池不启动新进程但也不终止当前正在运行的进程?
问题描述
我正在使用 Pythonmultiprocessing.Pool
类Python 2.7
。我有大量只能在一天中的某个时间段内运行的作业。每项工作都需要一些时间。我想限制作业一次最多运行 n 个并行。
池功能可以很好地限制并行作业的数量,但是当我试图结束这些作业时它似乎有问题。当我在窗口结束时,我希望当前正在运行的作业完成它们的处理。我不想开始新的工作。我一直在尝试使用 来执行此操作Pool.close()
,这确实可以让我正在运行的进程按需要完成,但从实验看来,即使在池关闭后,队列中但尚未开始处理的作业仍将提交处理。
另一种选择是Pool.terminate()
主动关闭正在运行的作业,这违背了预期的行为。
功能 | 允许正在运行的作业完成 | 阻止新工作开始 |
---|---|---|
.terminate() | 不 | 是的 |
。关() | 是的 | 不 |
期望的行为 | 是的 | 是的 |
解决方案
首先,你不应该使用 Python2.7,它已经被弃用了一段时间。
您应该使用标准库ProcessPoolExecutor
中的aconcurrent.futures
并在激活标志的情况下调用该.shutdown()
方法,cancel_futures
以让执行程序完成已启动的作业但取消任何待处理的工作。
from concurrent.futures import ProcessPoolExecutor
parallel_jobs = 4 # The pool size
executor = ProcessPoolExecutor(parallel_jobs)
future_1 = executor.submit(work_1, argument_1)
...
future_n = executor.submit(work_n, argument_n)
...
# At some point when the time window ends and you need to stop jobs:
executor.shutdown(cancel_futures=True)
# Some futures such as future_n may have been cancelled here, you can check that:
if future_n.cancelled():
print("job n has been cancelled")
# Or you can try to get the result while checking for CancelledError:
try:
result_n = future_n.result()
except CancelledError:
print("job n hasn't finished in the given time window")
以下是取消的示例:
from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep
# The job to execute concurrently
def foo(i: int) -> str:
sleep(0.2)
print(i)
return f"{i}"
e = ThreadPoolExecutor(4)
# Jobs are scheduled concurrently, this call does not block
futures = [e.submit(foo, i) for i in range(100)]
# Shutdown during execution and cancel pending jobs
e.shutdown(cancel_futures=True)
# Gather completed results
results = [f.result() for f in futures if not f.cancelled()]
print(results)
如果您执行此代码,您会看到 100 个计划的作业并未全部完成,只有一些是因为执行程序已在其间关闭。
推荐阅读
- dart - Column 中的 Flutter 定位小部件
- angular - 浏览器取消来自 Angular 应用程序的 http 请求
- css - 编译 SCSS 会生成超过 3 MB 的 CSS 文件
- selenium - 如何使用 selenium webdriver 定位元素
- bash - 为什么 cat 只打印文件的第一行和最后一行?
- c# - 在 Xamarin 中创建堆栈时“对象未设置为对象的实例”
- javascript - 我怎样才能用这个例子做一个模拟函数?
- go - 在 Go 中向特定客户端发送 Websocket 消息(使用 Gorilla)
- php - CodeIgniter:所有视图中的 $data 访问
- sonarqube - 用于代码气味的 SonarQube Web API - 技术债务计数