首页 > 解决方案 > 你如何让多处理池不启动新进程但也不终止当前正在运行的进程?

问题描述

我正在使用 Pythonmultiprocessing.PoolPython 2.7。我有大量只能在一天中的某个时间段内运行的作业。每项工作都需要一些时间。我想限制作业一次最多运行 n 个并行。

池功能可以很好地限制并行作业的数量,但是当我试图结束这些作业时它似乎有问题。当我在窗口结束时,我希望当前正在运行的作业完成它们的处理。我不想开始新的工作。我一直在尝试使用 来执行此操作Pool.close(),这确实可以让我正在运行的进程按需要完成,但从实验看来,即使在池关闭后,队列中但尚未开始处理的作业仍将提交处理。

另一种选择是Pool.terminate()主动关闭正在运行的作业,这违背了预期的行为。

功能 允许正在运行的作业完成 阻止新工作开始
.terminate() 是的
。关() 是的
期望的行为 是的 是的

标签: pythonpython-2.7multiprocessingpool

解决方案


首先,你不应该使用 Python2.7,它已经被弃用了一段时间。

您应该使用标准库ProcessPoolExecutor中的aconcurrent.futures并在激活标志的情况下调用该.shutdown()方法,cancel_futures以让执行程序完成已启动的作业但取消任何待处理的工作。

from concurrent.futures import ProcessPoolExecutor

parallel_jobs = 4  # The pool size
executor = ProcessPoolExecutor(parallel_jobs)

future_1 = executor.submit(work_1, argument_1)
...
future_n = executor.submit(work_n, argument_n)

...
# At some point when the time window ends and you need to stop jobs:
executor.shutdown(cancel_futures=True)

# Some futures such as future_n may have  been cancelled here, you can check that:
if future_n.cancelled():
    print("job n has been cancelled")

# Or you can try to get the result while checking for CancelledError:

try:
    result_n = future_n.result()
except CancelledError:
    print("job n hasn't finished in the given time window")

以下是取消的示例:

from concurrent.futures import ThreadPoolExecutor, as_completed, wait
from time import sleep

# The job to execute concurrently
def foo(i: int) -> str:
    sleep(0.2)
    print(i)
    return f"{i}"

e = ThreadPoolExecutor(4)

# Jobs are scheduled concurrently, this call does not block
futures = [e.submit(foo, i) for i in range(100)]

# Shutdown during execution and cancel pending jobs
e.shutdown(cancel_futures=True)

# Gather completed results
results = [f.result() for f in futures if not f.cancelled()]
print(results)

如果您执行此代码,您会看到 100 个计划的作业并未全部完成,只有一些是因为执行程序已在其间关闭。


推荐阅读