首页 > 解决方案 > 如何根据资源使用情况跟踪和终止池的进程?

问题描述

我正在研究宏基因组学管道,我有一个只能占用一个 CPU 的进程(Prodigal),我一次最多需要运行 5 个。当服务器的“wa”超过 0.3 时,我需要从进程池中终止特定进程,或者以某种方式减少活动工作者的数量。

Prodigal流程结束后

代码的简化版本

 def multi_prodigal_processing():
    directories = glob.glob(root)
    prodigal_pool = Pool(processes=5)
    for directory in directories:
        prodigal_pool.apply_async(run_prodigal, (directory,))   
def monitor_prodigal_usage(pool):

     while prodigals_running :
          if check_over_usage() :
              pool.terminate()

但这会终止所有进程。我想要做的是终止“最年轻”的进程并将池减少到三个活跃的工作人员。问题是我只有 pool.terminate 会终止一切,或者我可以使用 multiprocessing.active_children() 但我认为即使我停止了池,池也会继续添加进程。

在仍然使用池对象的同时有什么好的方法吗?我应该只使用 multiprocess.Process 并创建一个跟踪 PIDS 并使用计数信号量的数据结构吗?包装进程并添加一个停止\启动命令?一定有更简单的方法

标签: pythonmultiprocessingresourcesthreadpoolbioinformatics

解决方案


我已经通过一些更改实现了您的方法,如果您可以看一下,我会喜欢它,也许我忽略了一些缺陷。它运行良好并提供一致的预期输出。(由于评论限制,我将其添加为答案)

from multiprocessing import Process, Manager,Queue # Used for multiprocessing prodigal
from queue import Empty
global process_dict  # Keeps track of active prodigal workers
global root_dirs_done  # Keeps track of directories processed with prodigal
global root_dirs_done_iterable  # Multiprocessing object isn't iterable, data is transferred to this after multiprocessing part is over
global directories  # All relevant directories to process from input directory
global root_dirs_queue  # Remaining directories to process
global prodigal_lock  # Synchs access to youngest_process
global youngest_process  # Keeps track of which prodigal input directories is currently being processed,in chronological order
global manager  # Synchs shared resources between prodigal workers

def setup():
    global dir_glob, directories, protein_file, prodigal_lock, root_dirs_done, root_dirs_queue, youngest_process, manager, process_dict, root_dirs_done_iterable
    manager = Manager()
    root_dirs_done_iterable = []
    root_dirs_queue = manager.Queue()
    youngest_process = manager.list()
    root_dirs_done = manager.list()
    prodigal_lock = manager.Lock()
    process_dict = manager.dict()

def prodigal_processing():
    parse_and_place = Process(target=parse_and_place_directories_for_processing)
    parse_and_place.start()
    parse_and_place.join()
    setup_workers(5)
    monitoring = Process(target=monitor_prodigals)
    monitoring.start()  # Monitors the workers until they're all inactive
    monitoring.join()
    post_prodigal_processing() 

def parse_and_place_directories_for_processing():
    global root_dirs_queue
    for directory in directories:  # Parse directories in input directory and 
        if check_directory(directory):   
            root_dirs_queue.put(directory)
def setup_workers(n):
    global process_dict
    for i in range(n):
        p = ProdigalWorker(i)
        process_dict[i] = p
    for p in process_dict.values():
        p.start()

class ProdigalWorker:
    def __init__(self, i):
        self.key = i
        self.directory = ""
        self.process = None
        self.work = True

    def start(self):
        self.process = Process(target=self.start_processing)
        self.process.start()

    def start_processing(self):
        global prodigal_lock, youngest_process, root_dirs_done, root_dirs_queue, process_dict
        while self.work:
            if not root_dirs_queue.empty():
                try:
                    self.directory = root_dirs_queue.get_nowait()  # Worker attempts to get a directory from the queue if the queue isn't empty
                    # print("Directory Grabbed by ", self.key)
                    prodigal_lock.acquire()
                    youngest_process.append(self.key)  # Worker updates that they have begun processing
                    prodigal_lock.release()
                    run_prodigal(self.directory)  # Processing begins
                    prodigal_lock.acquire()
                    youngest_process.remove(self.key)  # Signal that they are done to monitor
                    prodigal_lock.release()
                    root_dirs_done.append(self.directory) 
                except Empty:
                    process_dict.pop(self.key)
                    break


            else:
                process_dict.pop(self.key)  # If the queue is empty, process exits and signals to the monitor it isn't processing any more directories
                break

def monitor_prodigals():
    global youngest_process, prodigal_lock, root_dirs_queue
    while True:
        if len(process_dict) == 0:  # Break condition is all workers are done
            break
        if len(process_dict) > 3:  
            if get_wa_usage() > 0.3:  
                prodigal_lock.acquire()
                worker = process_dict.pop(youngest_process.pop())
                worker.work = False
                worker.process.terminate()
                root_dirs_queue.put(worker.directory)
                prodigal_lock.release()
        else:
            for p in process_dict.values():  # If there are equal\less than the minimum, wait for what's running to finish
                try:
                    p.process.join()
                except AttributeError:
                    continue
            if len(process_dict)==0:
                break
            else:
                continue
def main():
    setup()
    prodigal_processing()

布布的评论

我的评论太长了,不能成为实际评论,所以请原谅我对您帖子的更新:

  1. prodigal_processing两次函数中,您启动一​​个进程,然后立即调用join该进程。所以你会立即阻塞主进程,直到新创建的子进程完成。因此,您没有并行运行任何东西,并且通过创建这些新子流程没有任何收获。您应该直接调用parse_and_place_directories_for_processingmonitor_prodigals获得相同但更有效的效果(没有创建新Process实例的开销)。post_prodigal_processing没有定义,所以我不知道应该做什么,也不能对此发表评论。
  2. 在函数setup_workers中,您管理了字典,process_dict您将每个ProdigalWorker实例存储为一个值,但您使用的键只是 0、1、... n-1。当然,一份清单也同样有意义,而且效率更高。
  3. 就像 99.99% 的发布问题的人一样multiprocessing,您也没有按照 SO 指南的要求使用您正在运行的实际平台(Linux?Windows?)来标记问题。如果您说您的代码有效,那么我推断它一定是 Linux 或其他fork用于创建新进程的平台,因为您没有将输入队列显式传递给新的子进程,而是依赖它们来访问输入队列通过全局变量root_dirs_queue。但在 Windows 等平台上,这将永远无法工作。每个新进程都将从头开始执行源代码,作为其初始化的一部分,在方法start_processing实际运行之前初始为空地址空间,因此只有全局范围内的代码将作为初始化的一部分执行,并且root_dirs_queue将保持未初始化。您的代码本身没有错;它只是非常依赖于平台。
  4. append您正在使用托管锁将操作的“单线程”强制为youngest_process. 这是完全没有必要的,因为append操作一开始是线程安全的。同样的remove操作。不一致的是,您发现在附加到root_dirs_done. 稍后我将对此有更多的发言权。
  5. 调用返回的托管队列等托管结构会manager.Queue带来相当多的开销。您应该改用root_dirs_queue = multiprocessing.Queue().
  6. monitor_prodigals,而不是if len(process_dict) == 0:它是更多的 Pythonic 编码if not process_dict:。无论process_dict是 adict还是 a都有效list。但在以下语句中,您正在测试process_dict. 因此,最初将长度分配给变量可能会更好:l = len(process_dict); if l == 0: break; if l > 3: etc.
  7. 有些事情似乎不太对劲monitor_prodigals。当它开始处理一个目录时将自己添加到列表中,然后在它完成时将自己删除,然后返回查看是否有另一个目录要处理。如果没有,它将自己从字典(或列表)中删除。有些处理窗口中的项目数不等于其中的项目数,即,每当一个或多个进程刚刚完成对目录的处理并已将自己从其中删除时。可以测试长度并发现它大于 3 但是当它从start_processingstart_processingyoungest_processprocess_dictprocess_dictyoungest_processyoungest_processmonitor_prodigalsprocess_dictprocess_dict它发现它是空的。诚然,这是极不可能的。但不太可能有 4 个进程,但现在只有 3 个在,youngest_process因为其中一个进程刚刚完成了其目录的处理。如果这是最年轻的,而它恰好已经完成了怎么办?您现在将弹出错误的进程。虽然appendandremove是线程安全的操作,我说你不需要为这些操作单独使用锁,但如果你想将这些操作组合为一个原子操作,你确实需要使用锁,我认为这就是你想要的去做:
def setup_workers(n):
   global process_dict
   for i in range(n):
       p = ProdigalWorker(i)
       process_dict[i] = p
       # Initialize youngest_process:
       youngest_process.append(p.key)
   for p in process_dict.values():
       p.start()

class ProdigalWorker:
    ...
    def start_processing(self):
        global prodigal_lock, youngest_process, root_dirs_done, root_dirs_queue, process_dict
        while True:
            try:
                with prodigal_lock:
                    self.directory = root_dirs_queue.get_nowait()  # Worker attempts to get a directory from the queue if the queue isn't empty
                    youngest_process.remove(self.key)
                    youngest_process.append(self.key)
                run_prodigal(self.directory)  # Processing begins
                root_dirs_done.append(self.directory) 
            except Empty:
                with prodigal_lock:
                    youngest_process.remove(self.key)
                    process_dict.pop(self.key)
                break

def monitor_prodigals():
    global youngest_process, prodigal_lock, root_dirs_queue
    while len(process_dict) > 3:
        if get_wa_usage() > 0.3:
            with prodigal_lock:
                # still > 3?
                if len(process_dict) <= 3:
                    break
                worker = process_dict.pop(youngest_process.pop())
                worker.process.terminate()
                root_dirs_queue.put(worker.directory)
    with prodigal_lock:
        process = [p.process for p in process_dict.values()]
    for p in process:
        p.join()

请注意,我在使用实例时删除了不必要和不可靠的检查。我也使用锁作为上下文管理器。更改代码的结果是,我似乎从未从中删除任何内容,而只是重新排列顺序。中的元素数将始终等于 中的元素数。if not root_dirs_queue.empty():multiprocessing.Queueyoungest_processyoungest_processprocess_dict

请注意,我已经进行了其他代码更改,因为坦率地说,我没有理解您代码的一些细微之处。尤其是在monitor_prodigals. 这个想法是,当您因为使用而需要终止进程时,您会将目录放回队列中。你刚刚终止的进程已经死了,它的状态self.work标志在这一点上是无关紧要的。至于仍在运行的其他任务,如果您正在将来自已取消进程的目录放回队列中,即使它们暂时发现队列为空,您也希望它们继续运行。但问题是在队列获得空状态后,它会跳出 get 循环,并且可能会错过被放回队列的目录。如果它没有跳出循环并且只要为self.workTrue就继续循环,那么它将无限循环。

我根本不会使用self.work。相反,我只使用锁访问输入队列。如果一个进程因为使用而终止,那么取消进程和重新排队目录的操作也被置于锁的控制之下。

现在是需要仔细检查我的更改。

进一步更新

我认为您的设计存在根本缺陷。您正在向ProdigalWorker托管字典添加实例,但在调用方法后,start_processing您正在使用实例初始化属性,Process从而间接将Process实例添加到托管字典中。但是Process实例不能跨地址空间/不同进程共享,所以这是非法操作。请参阅以下程序:

from multiprocessing import Manager, Process

def foo(d):
    print(d['x'].p)

class X:
    def __init__(self):
        self.p = None

def main():
    manager = Manager()
    d = manager.dict()
    x = X()
    d['x'] = x
    print('None' if d['x'].p is None else 'Not None')
    # Now add a Process attribute:
    x.p = Process(target=foo, args=(d,))
    # Still None because the managed dictionary doesn't know of the update:
    print('None' if d['x'].p is None else 'Not None')
    # We must reassign the key to let the dictionary know of the change.
    # But this will now raise an exception:
    d['x'] = x

if __name__ == '__main__':
    main()

印刷:

None
None
Traceback (most recent call last):
  File "C:\Ron\test\test.py", line 24, in <module>
    main()
  File "C:\Ron\test\test.py", line 21, in main
    d['x'] = x
  File "<string>", line 2, in __setitem__
  File "C:\Program Files\Python38\lib\multiprocessing\managers.py", line 834, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "C:\Program Files\Python38\lib\multiprocessing\connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "C:\Program Files\Python38\lib\multiprocessing\reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
  File "C:\Program Files\Python38\lib\multiprocessing\process.py", line 347, in __reduce__
    raise TypeError(
TypeError: Pickling an AuthenticationString object is disallowed for security reasons

您的代码不会引发异常,因为即使在方法中start您正在将Process属性设置为已存储到托管字典中的对象中...

    self.process = Process(target=self.start_processing)

...托管字典本身尚未使用更新的对象进行更新。但这意味着什么时候monitor_prodigals去终止最年轻的工人并执行......

                worker = process_dict.pop(youngest_process.pop())
                worker.work = False
                worker.process.terminate()

...worker.process不会被设置,这应该会引发异常。


推荐阅读