python - 如何根据资源使用情况跟踪和终止池的进程?
问题描述
我正在研究宏基因组学管道,我有一个只能占用一个 CPU 的进程(Prodigal),我一次最多需要运行 5 个。当服务器的“wa”超过 0.3 时,我需要从进程池中终止特定进程,或者以某种方式减少活动工作者的数量。
Prodigal流程结束后
代码的简化版本
def multi_prodigal_processing():
directories = glob.glob(root)
prodigal_pool = Pool(processes=5)
for directory in directories:
prodigal_pool.apply_async(run_prodigal, (directory,))
def monitor_prodigal_usage(pool):
while prodigals_running :
if check_over_usage() :
pool.terminate()
但这会终止所有进程。我想要做的是终止“最年轻”的进程并将池减少到三个活跃的工作人员。问题是我只有 pool.terminate 会终止一切,或者我可以使用 multiprocessing.active_children() 但我认为即使我停止了池,池也会继续添加进程。
在仍然使用池对象的同时有什么好的方法吗?我应该只使用 multiprocess.Process 并创建一个跟踪 PIDS 并使用计数信号量的数据结构吗?包装进程并添加一个停止\启动命令?一定有更简单的方法
解决方案
我已经通过一些更改实现了您的方法,如果您可以看一下,我会喜欢它,也许我忽略了一些缺陷。它运行良好并提供一致的预期输出。(由于评论限制,我将其添加为答案)
from multiprocessing import Process, Manager,Queue # Used for multiprocessing prodigal
from queue import Empty
global process_dict # Keeps track of active prodigal workers
global root_dirs_done # Keeps track of directories processed with prodigal
global root_dirs_done_iterable # Multiprocessing object isn't iterable, data is transferred to this after multiprocessing part is over
global directories # All relevant directories to process from input directory
global root_dirs_queue # Remaining directories to process
global prodigal_lock # Synchs access to youngest_process
global youngest_process # Keeps track of which prodigal input directories is currently being processed,in chronological order
global manager # Synchs shared resources between prodigal workers
def setup():
global dir_glob, directories, protein_file, prodigal_lock, root_dirs_done, root_dirs_queue, youngest_process, manager, process_dict, root_dirs_done_iterable
manager = Manager()
root_dirs_done_iterable = []
root_dirs_queue = manager.Queue()
youngest_process = manager.list()
root_dirs_done = manager.list()
prodigal_lock = manager.Lock()
process_dict = manager.dict()
def prodigal_processing():
parse_and_place = Process(target=parse_and_place_directories_for_processing)
parse_and_place.start()
parse_and_place.join()
setup_workers(5)
monitoring = Process(target=monitor_prodigals)
monitoring.start() # Monitors the workers until they're all inactive
monitoring.join()
post_prodigal_processing()
def parse_and_place_directories_for_processing():
global root_dirs_queue
for directory in directories: # Parse directories in input directory and
if check_directory(directory):
root_dirs_queue.put(directory)
def setup_workers(n):
global process_dict
for i in range(n):
p = ProdigalWorker(i)
process_dict[i] = p
for p in process_dict.values():
p.start()
class ProdigalWorker:
def __init__(self, i):
self.key = i
self.directory = ""
self.process = None
self.work = True
def start(self):
self.process = Process(target=self.start_processing)
self.process.start()
def start_processing(self):
global prodigal_lock, youngest_process, root_dirs_done, root_dirs_queue, process_dict
while self.work:
if not root_dirs_queue.empty():
try:
self.directory = root_dirs_queue.get_nowait() # Worker attempts to get a directory from the queue if the queue isn't empty
# print("Directory Grabbed by ", self.key)
prodigal_lock.acquire()
youngest_process.append(self.key) # Worker updates that they have begun processing
prodigal_lock.release()
run_prodigal(self.directory) # Processing begins
prodigal_lock.acquire()
youngest_process.remove(self.key) # Signal that they are done to monitor
prodigal_lock.release()
root_dirs_done.append(self.directory)
except Empty:
process_dict.pop(self.key)
break
else:
process_dict.pop(self.key) # If the queue is empty, process exits and signals to the monitor it isn't processing any more directories
break
def monitor_prodigals():
global youngest_process, prodigal_lock, root_dirs_queue
while True:
if len(process_dict) == 0: # Break condition is all workers are done
break
if len(process_dict) > 3:
if get_wa_usage() > 0.3:
prodigal_lock.acquire()
worker = process_dict.pop(youngest_process.pop())
worker.work = False
worker.process.terminate()
root_dirs_queue.put(worker.directory)
prodigal_lock.release()
else:
for p in process_dict.values(): # If there are equal\less than the minimum, wait for what's running to finish
try:
p.process.join()
except AttributeError:
continue
if len(process_dict)==0:
break
else:
continue
def main():
setup()
prodigal_processing()
布布的评论
我的评论太长了,不能成为实际评论,所以请原谅我对您帖子的更新:
- 在
prodigal_processing
两次函数中,您启动一个进程,然后立即调用join
该进程。所以你会立即阻塞主进程,直到新创建的子进程完成。因此,您没有并行运行任何东西,并且通过创建这些新子流程没有任何收获。您应该直接调用parse_and_place_directories_for_processing
并monitor_prodigals
获得相同但更有效的效果(没有创建新Process
实例的开销)。post_prodigal_processing
没有定义,所以我不知道应该做什么,也不能对此发表评论。 - 在函数
setup_workers
中,您管理了字典,process_dict
您将每个ProdigalWorker
实例存储为一个值,但您使用的键只是 0、1、... n-1。当然,一份清单也同样有意义,而且效率更高。 - 就像 99.99% 的发布问题的人一样
multiprocessing
,您也没有按照 SO 指南的要求使用您正在运行的实际平台(Linux?Windows?)来标记问题。如果您说您的代码有效,那么我推断它一定是 Linux 或其他fork
用于创建新进程的平台,因为您没有将输入队列显式传递给新的子进程,而是依赖它们来访问输入队列通过全局变量root_dirs_queue
。但在 Windows 等平台上,这将永远无法工作。每个新进程都将从头开始执行源代码,作为其初始化的一部分,在方法start_processing
实际运行之前初始为空地址空间,因此只有全局范围内的代码将作为初始化的一部分执行,并且root_dirs_queue
将保持未初始化。您的代码本身没有错;它只是非常依赖于平台。 append
您正在使用托管锁将操作的“单线程”强制为youngest_process
. 这是完全没有必要的,因为append
操作一开始是线程安全的。同样的remove
操作。不一致的是,您发现在附加到root_dirs_done
. 稍后我将对此有更多的发言权。- 调用返回的托管队列等托管结构会
manager.Queue
带来相当多的开销。您应该改用root_dirs_queue = multiprocessing.Queue()
. - 在
monitor_prodigals
,而不是if len(process_dict) == 0:
它是更多的 Pythonic 编码if not process_dict:
。无论process_dict
是 adict
还是 a都有效list
。但在以下语句中,您正在测试process_dict
. 因此,最初将长度分配给变量可能会更好:l = len(process_dict); if l == 0: break; if l > 3: etc.
- 有些事情似乎不太对劲
monitor_prodigals
。当它开始处理一个目录时将自己添加到列表中,然后在它完成时将自己删除,然后返回查看是否有另一个目录要处理。如果没有,它将自己从字典(或列表)中删除。有些处理窗口中的项目数不等于其中的项目数,即,每当一个或多个进程刚刚完成对目录的处理并已将自己从其中删除时。可以测试长度并发现它大于 3 但是当它从start_processing
start_processing
youngest_process
process_dict
process_dict
youngest_process
youngest_process
monitor_prodigals
process_dict
process_dict
它发现它是空的。诚然,这是极不可能的。但不太可能有 4 个进程,但现在只有 3 个在,youngest_process
因为其中一个进程刚刚完成了其目录的处理。如果这是最年轻的,而它恰好已经完成了怎么办?您现在将弹出错误的进程。虽然append
andremove
是线程安全的操作,我说你不需要为这些操作单独使用锁,但如果你想将这些操作组合为一个原子操作,你确实需要使用锁,我认为这就是你想要的去做:
def setup_workers(n):
global process_dict
for i in range(n):
p = ProdigalWorker(i)
process_dict[i] = p
# Initialize youngest_process:
youngest_process.append(p.key)
for p in process_dict.values():
p.start()
class ProdigalWorker:
...
def start_processing(self):
global prodigal_lock, youngest_process, root_dirs_done, root_dirs_queue, process_dict
while True:
try:
with prodigal_lock:
self.directory = root_dirs_queue.get_nowait() # Worker attempts to get a directory from the queue if the queue isn't empty
youngest_process.remove(self.key)
youngest_process.append(self.key)
run_prodigal(self.directory) # Processing begins
root_dirs_done.append(self.directory)
except Empty:
with prodigal_lock:
youngest_process.remove(self.key)
process_dict.pop(self.key)
break
def monitor_prodigals():
global youngest_process, prodigal_lock, root_dirs_queue
while len(process_dict) > 3:
if get_wa_usage() > 0.3:
with prodigal_lock:
# still > 3?
if len(process_dict) <= 3:
break
worker = process_dict.pop(youngest_process.pop())
worker.process.terminate()
root_dirs_queue.put(worker.directory)
with prodigal_lock:
process = [p.process for p in process_dict.values()]
for p in process:
p.join()
请注意,我在使用实例时删除了不必要和不可靠的检查。我也使用锁作为上下文管理器。更改代码的结果是,我似乎从未从中删除任何内容,而只是重新排列顺序。中的元素数将始终等于 中的元素数。if not root_dirs_queue.empty():
multiprocessing.Queue
youngest_process
youngest_process
process_dict
请注意,我已经进行了其他代码更改,因为坦率地说,我没有理解您代码的一些细微之处。尤其是在monitor_prodigals
. 这个想法是,当您因为使用而需要终止进程时,您会将目录放回队列中。你刚刚终止的进程已经死了,它的状态self.work
标志在这一点上是无关紧要的。至于仍在运行的其他任务,如果您正在将来自已取消进程的目录放回队列中,即使它们暂时发现队列为空,您也希望它们继续运行。但问题是在队列获得空状态后,它会跳出 get 循环,并且可能会错过被放回队列的目录。如果它没有跳出循环并且只要为self.work
True就继续循环,那么它将无限循环。
我根本不会使用self.work
。相反,我只使用锁访问输入队列。如果一个进程因为使用而终止,那么取消进程和重新排队目录的操作也被置于锁的控制之下。
现在是你需要仔细检查我的更改。
进一步更新
我认为您的设计存在根本缺陷。您正在向ProdigalWorker
托管字典添加实例,但在调用方法后,start_processing
您正在使用实例初始化属性,Process
从而间接将Process
实例添加到托管字典中。但是Process
实例不能跨地址空间/不同进程共享,所以这是非法操作。请参阅以下程序:
from multiprocessing import Manager, Process
def foo(d):
print(d['x'].p)
class X:
def __init__(self):
self.p = None
def main():
manager = Manager()
d = manager.dict()
x = X()
d['x'] = x
print('None' if d['x'].p is None else 'Not None')
# Now add a Process attribute:
x.p = Process(target=foo, args=(d,))
# Still None because the managed dictionary doesn't know of the update:
print('None' if d['x'].p is None else 'Not None')
# We must reassign the key to let the dictionary know of the change.
# But this will now raise an exception:
d['x'] = x
if __name__ == '__main__':
main()
印刷:
None
None
Traceback (most recent call last):
File "C:\Ron\test\test.py", line 24, in <module>
main()
File "C:\Ron\test\test.py", line 21, in main
d['x'] = x
File "<string>", line 2, in __setitem__
File "C:\Program Files\Python38\lib\multiprocessing\managers.py", line 834, in _callmethod
conn.send((self._id, methodname, args, kwds))
File "C:\Program Files\Python38\lib\multiprocessing\connection.py", line 206, in send
self._send_bytes(_ForkingPickler.dumps(obj))
File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 347, in __reduce__
raise TypeError(
TypeError: Pickling an AuthenticationString object is disallowed for security reasons
您的代码不会引发异常,因为即使在方法中start
您正在将Process
属性设置为已存储到托管字典中的对象中...
self.process = Process(target=self.start_processing)
...托管字典本身尚未使用更新的对象进行更新。但这意味着什么时候monitor_prodigals
去终止最年轻的工人并执行......
worker = process_dict.pop(youngest_process.pop())
worker.work = False
worker.process.terminate()
...worker.process
不会被设置,这应该会引发异常。
推荐阅读
- java - 使用带有 Java 枚举的 lombok @Getter 注解
- javascript - 使用 Filereader API 时如何设置下载的文件名?
- twitter-bootstrap - Bootstrap 拉伸链接在 Chrome 和 Edge 上断开,但在 Firefox 上有效
- swift - 如何让 RealityKit 只显示 CollisionComponents?
- rest - ReST - PUT vs PATCH 以在添加新属性时最大限度地减少客户端和 API 之间的耦合
- azure - 有没有办法阻止 azure app 服务的插槽交换
- html - 如何判断哪个 html 元素在服务器端发送了 http 请求?
- python - Django API 日志到 LogStash
- windows - 在操作系统启动时启动提升的应用程序,没有 UAC 提示
- keras - 检查输入时出错:预期 conv2d_19_input 有 4 个维度,但得到了形状为 (274, 1) 深度学习的数组