首页 > 解决方案 > 如何了解 python 多处理上的活跃工作者

问题描述

我开发了一个代码,它根据其他 conf 文件和名为:config.threadsT​​askApi 的变量创建 N 个工作人员。这会并行运行 X 个工作人员并始终使用此 X 个工作人员。每个工作人员必须从数据库中识别 Y limit_WF 并将其发送到 API。

在某些时候,如果只有 1 个工作人员处于活动状态,我想引起注意,因为也许当我通过 client_id 决定工作人员时,有一个 client_id 在 db 上有 100.000 个寄存器,而其他的在 5.000 个。因此,该进程结束于完成其他 client_id 进程并且只有 1 个子进程工作者处于活动状态。process() 函数被其他 .py 文件调用。

当这种情况发生时,我希望引起注意并改变行为,让 X 数量的工作人员处理这个孩子的进程,以使其工作并行。因此,我将始终激活并行性。这是代码:

class BigDHistoric:
def __init__(self):
    print("BigD Historic object created")
    self.config = Config()
def updateBigdQueueJsonBatch(self, client_id):
    batchConn = None
    foundWf = 1
    processed_workfiles = 0
    try:
        print("Processing client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
        tasksApi = TasksApi()
        token = tasksApi.getTasksApiToken()
        while foundWf == 1:  
            print('1 - CLIENT_ID: '+str(client_id)+" BEFORE FETCH , processed_workfiles: "+str(processed_workfiles))   
            batchConn = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
            cur = batchConn.cursor()
            sqlBatch = "select workfile_id from bigd_queue where sent = 0  and message_code = 1 and client_id = "+str(client_id)+"  order by id  limit "+str(self.config.limit_WF)      
            cur.execute(sqlBatch)
            rowcount = cur.rowcount                
            if rowcount > 0 :
                queue_records_wf = cur.fetchall()
                batchConn.close()
                listWf = []
                for wf in queue_records_wf:
                    listWf.append(wf[0])                  
                data = {"client_id": client_id, "workfile_id":listWf, "use_read_replica":1, "verbose":self.config.verboseTaskApi}
                controller = 'WorkfileStats/Batch/'

                start_time = time.time()  
                r = tasksApi.post(controller, data, token)
                time_response_taskApi = time.time()
                elapsed_time = round(time_response_taskApi - start_time,2)
                
                print("2 - CLIENT_ID: "+str(client_id)+" RESULT: "+str(r.status_code))
                if r.status_code == 404 or r.status_code == 401 or r.status_code == 503 or r.status_code == 504 or r.status_code == 500:
                    self.createErrorLogPost(client_id,"[Cron_bigd.bigd_historic] Error returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
                    time.sleep(35)
                    self.updateBigdQueueJsonBatch(client_id)
                else:
                    processed_workfiles += rowcount
                    print("3 - CLIENT_ID: "+str(client_id)+" processed_workfiles: "+str(processed_workfiles))
                    if(rowcount < self.config.limit_WF):
                        foundWf = 0
                        batchConn.close()
                if(self.config.verboseTaskApi == 1):
                    self.createLogPostMessage(client_id,0,"[Cron_bigd.bigd_historic] returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
            else:
                foundWf = 0
                batchConn.close()   
    except Exception as e:
        print(e)
        raise e
    finally:
        print("PROCESSED client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
        if(batchConn):
            batchConn.close()

def process(self):
    connBigD = None
    try:
        print("--------------------------------------")
        print("STARTING BIGD_HISTORIC FROM BIGD_QUEUE")
        print("--------------------------------------")
        connBigD = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
        #Comprovem si cal omplir workfiles amb data, són aquells històrics generats per la tasca de ifs wf_send_stats 
        sqlBatch = "select client_id from bigd_queue where message_code = 1 and sent = 0 group by client_id order by client_id"
        curBatch = connBigD.cursor()
        curBatch.execute(sqlBatch)
        rowcount = curBatch.rowcount                
        cliList = []
        if rowcount > 0 :
            queue_records_batch = curBatch.fetchall()  
            for row_client in queue_records_batch:
                cliList.append(row_client[0])
        if(connBigD):
            connBigD.close()

        if(len(cliList) > 0):
            p = Pool(self.config.threadsTaskApi)
            result = p.map_async(
                self.updateBigdQueueJsonBatch,
                cliList,
            )
            result.get()
            p.close()
            p.join()
    except Exception as e:
        print("Error Exception on bigd_historic: "+str(e))
        self.createErrorLogPost(0,"[Cron_bigd.bigd_historic] Error: "+str(e))
        sys.exit(1)
    finally:
        print("--------------------------------------------")
        print("ENDING PROCESS BIGD_HISTORIC FROM BIGD_QUEUE")
        print("--------------------------------------------")
        if(connBigD):
            connBigD.close()

标签: pythonpython-3.x

解决方案


推荐阅读