首页 > 解决方案 > 基于所有正在运行的作业的聚合标准的 Python 多处理作业提交

问题描述

基于 Python 多处理作业提交的所有正在运行的作业的聚合标准

我有一份工作需要在 Teradata 数据库上做一些工作,并将 db 会话数作为参数。数据库对数据库会话数的最大限制为 60。我可以使用多处理有条件地处理作业,以便所有活动子进程中的 sum(num_db_sessions) <= max_num_db_sessions?

我只是在下面粘贴一些伪代码:

import multiprocessing as mp
import time

def dbworker(db_object, num_db_sessions):
    # do work on db_object #####
    # The sum(num_db_sessions) <= max_num_db_sessions 
    print (db_object, num_db_sessions)
    # The db_objs with larger num_db_sessions take longer to finish
    time.sleep(num_db_sessions)
    return

if __name__ == "__main__":
    max_num_db_sessions = 60
    # JobsList (db_object,num_db_sessions)
    jobs_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
                , ('E', 1), ('F', 1), ('G', 1), ('H', 1)
                , ('I', 1), ('J', 1), ('K', 1), ('L', 1)
                , ('M', 2), ('N', 1), ('O', 1), ('P', 1)
                , ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
                , ('U', 2), ('V', 2), ('W', 2), ('X', 2)
                , ('Y', 2), ('Z', 2)]
    ## Submit jobs_list to mutltiprocessing ####
    for db_object,num_db_sessions in jobs_list:
        dbworker(db_object,num_db_sessions) ## -->>> sum(num_db_sessions) <=  max_num_db_sessions
    ## Is this possible ??

标签: pythonqueuemultiprocessingshared-state

解决方案


我已经想通了。下面的代码就是这样做的。关键要素是:

1) 运行一个单独的守护进程将任务放入队列。为此的目标函数进行编排

2) 将计数器实现为 multiprocessing.value,它跟踪当前正在运行的会话数。计数器的实现取自https://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing

3) 实现一个 multiprocessing.manager().list() 来跟踪未提交的作业。

4) 使用毒丸通过发送 None * number_of_child_processes 来破坏工作进程,如毒丸方法中所实现的那样。这取自https://pymotw.com/3/multiprocessing/communication.html

worker 函数使用 time.sleep(num_db_sessions) 作为模拟工作负载的方式(更高的处理时间)

这是代码。

import multiprocessing
import time
class Counter(object):
    def __init__(self, initval=0):
        self.val = multiprocessing.Value('i', initval)
        self.lock = multiprocessing.Lock()

    def increment(self,val):
        with self.lock:
            self.val.value += val

    def value(self):
        with self.lock:
            return self.val.value

def queue_manager(tasks,results,jobs_list,counter,max_num_db_sessions,num_consumers):
    proc_name = multiprocessing.current_process().name
    while len(jobs_list) > 0:
        current_counter = counter.value()
        available_sessions = max_num_db_sessions - current_counter
        if available_sessions > 0:
            prop_list = [(p,s) for p,s in jobs_list if s <= available_sessions]
            if (len(prop_list)) > 0:
                with multiprocessing.Lock():
                    print(prop_list[0])
                    tasks.put(prop_list[0][0])
                    jobs_list.remove(prop_list[0])

                counter.increment(prop_list[0][1])
                print("Process: {} -- submitted:{} Counter is:{} Sessions:{}".format(proc_name
                                                                          , prop_list[0][0]
                                                                          , current_counter
                                                                          , available_sessions)
                      )
        else:
            print("Process: {} -- Sleeping:{} Counter is:{} Sessions:{}".format(proc_name
                                                                                 , str(5)
                                                                                 , current_counter
                                                                                 , available_sessions)
            )
            time.sleep(5)
    else:
        for i in range(num_consumers):
            tasks.put(None)

def worker(tasks,counter,proc_list):
    proc_name = multiprocessing.current_process().name
    while True:
        obj = tasks.get()
        if obj is None:
            break
        name,age = [(name,sess) for name,sess in proc_list if name == obj][0]
        print("Process: {} -- Processing:{} Sleeping for:{} Counter is:{}".format(proc_name
                                                                              ,name
                                                                              ,age
                                                                              ,counter.value())
              )
        time.sleep(age)
        counter.increment(-age)
        print("Process: {} -- Exiting:{} Sleeping for:{} Counter is:{}".format(proc_name
                                                                              ,name
                                                                              ,age
                                                                              ,counter.value())
              )

if __name__ == '__main__':
    max_num_db_sessions = 60
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue() # This will be unused now. But will use it.
    mpmanager = multiprocessing.Manager()
    proc_list = [('A', 15), ('B', 15), ('C', 15), ('D', 15)
                , ('E', 1), ('F', 1), ('G', 1), ('H', 1)
                , ('I', 1), ('J', 1), ('K', 1), ('L', 1)
                , ('M', 2), ('N', 1), ('O', 1), ('P', 1)
                , ('Q', 2), ('R', 2), ('S', 2), ('T', 2)
                , ('U', 2), ('V', 2), ('W', 2), ('X', 2)
                , ('Y', 2), ('Z', 2)]
    jobs_list = mpmanager.list(proc_list)
    counter = Counter(0)
    num_cpu = 3
    d = multiprocessing.Process(name='Queue_manager_proc'
                                ,target=queue_manager
                                ,args=(tasks, results, jobs_list, counter
                                       , max_num_db_sessions, num_cpu)
                                )
    d.daemon = True
    d.start()
    jobs = []
    for i in range(num_cpu):
        p = multiprocessing.Process(name="Worker_proc_{}".format(str(i+1))
                                    ,target=worker
                                    ,args=(tasks,counter,proc_list)
                                    )
        jobs.append(p)
        p.start()

    for job in jobs:
        job.join()

    d.join()

推荐阅读