python - 如何了解 python 多处理上的活跃工作者
问题描述
我开发了一个代码,它根据其他 conf 文件和名为:config.threadsTaskApi 的变量创建 N 个工作人员。这会并行运行 X 个工作人员并始终使用此 X 个工作人员。每个工作人员必须从数据库中识别 Y limit_WF 并将其发送到 API。
在某些时候,如果只有 1 个工作人员处于活动状态,我想引起注意,因为也许当我通过 client_id 决定工作人员时,有一个 client_id 在 db 上有 100.000 个寄存器,而其他的在 5.000 个。因此,该进程结束于完成其他 client_id 进程并且只有 1 个子进程工作者处于活动状态。process() 函数被其他 .py 文件调用。
当这种情况发生时,我希望引起注意并改变行为,让 X 数量的工作人员处理这个孩子的进程,以使其工作并行。因此,我将始终激活并行性。这是代码:
class BigDHistoric:
def __init__(self):
print("BigD Historic object created")
self.config = Config()
def updateBigdQueueJsonBatch(self, client_id):
batchConn = None
foundWf = 1
processed_workfiles = 0
try:
print("Processing client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
tasksApi = TasksApi()
token = tasksApi.getTasksApiToken()
while foundWf == 1:
print('1 - CLIENT_ID: '+str(client_id)+" BEFORE FETCH , processed_workfiles: "+str(processed_workfiles))
batchConn = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
cur = batchConn.cursor()
sqlBatch = "select workfile_id from bigd_queue where sent = 0 and message_code = 1 and client_id = "+str(client_id)+" order by id limit "+str(self.config.limit_WF)
cur.execute(sqlBatch)
rowcount = cur.rowcount
if rowcount > 0 :
queue_records_wf = cur.fetchall()
batchConn.close()
listWf = []
for wf in queue_records_wf:
listWf.append(wf[0])
data = {"client_id": client_id, "workfile_id":listWf, "use_read_replica":1, "verbose":self.config.verboseTaskApi}
controller = 'WorkfileStats/Batch/'
start_time = time.time()
r = tasksApi.post(controller, data, token)
time_response_taskApi = time.time()
elapsed_time = round(time_response_taskApi - start_time,2)
print("2 - CLIENT_ID: "+str(client_id)+" RESULT: "+str(r.status_code))
if r.status_code == 404 or r.status_code == 401 or r.status_code == 503 or r.status_code == 504 or r.status_code == 500:
self.createErrorLogPost(client_id,"[Cron_bigd.bigd_historic] Error returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
time.sleep(35)
self.updateBigdQueueJsonBatch(client_id)
else:
processed_workfiles += rowcount
print("3 - CLIENT_ID: "+str(client_id)+" processed_workfiles: "+str(processed_workfiles))
if(rowcount < self.config.limit_WF):
foundWf = 0
batchConn.close()
if(self.config.verboseTaskApi == 1):
self.createLogPostMessage(client_id,0,"[Cron_bigd.bigd_historic] returning status from TaskAPI: "+str(r.status_code)+" client_id: "+str(client_id)+"; workfile_list: "+str(listWf)+"; elapsed_time response: "+str(elapsed_time))
else:
foundWf = 0
batchConn.close()
except Exception as e:
print(e)
raise e
finally:
print("PROCESSED client_id: "+str(client_id)+"; historic data bigd_queue with process id: "+str(os.getpid()))
if(batchConn):
batchConn.close()
def process(self):
connBigD = None
try:
print("--------------------------------------")
print("STARTING BIGD_HISTORIC FROM BIGD_QUEUE")
print("--------------------------------------")
connBigD = psycopg2.connect(user = self.config.rds_data_user, password = self.config.rds_data_password, host = self.config.rds_data_host, port = "5432", database = self.config.rds_data_database)
#Comprovem si cal omplir workfiles amb data, són aquells històrics generats per la tasca de ifs wf_send_stats
sqlBatch = "select client_id from bigd_queue where message_code = 1 and sent = 0 group by client_id order by client_id"
curBatch = connBigD.cursor()
curBatch.execute(sqlBatch)
rowcount = curBatch.rowcount
cliList = []
if rowcount > 0 :
queue_records_batch = curBatch.fetchall()
for row_client in queue_records_batch:
cliList.append(row_client[0])
if(connBigD):
connBigD.close()
if(len(cliList) > 0):
p = Pool(self.config.threadsTaskApi)
result = p.map_async(
self.updateBigdQueueJsonBatch,
cliList,
)
result.get()
p.close()
p.join()
except Exception as e:
print("Error Exception on bigd_historic: "+str(e))
self.createErrorLogPost(0,"[Cron_bigd.bigd_historic] Error: "+str(e))
sys.exit(1)
finally:
print("--------------------------------------------")
print("ENDING PROCESS BIGD_HISTORIC FROM BIGD_QUEUE")
print("--------------------------------------------")
if(connBigD):
connBigD.close()
解决方案
推荐阅读
- python - 将 Keras 集成到 SKLearn 管道?
- python-3.x - 如何使用这种数据集格式创建和弦图?
- python - 我想用 python 求解一个线性方程:LinAlgError 奇异矩阵
- c++ - 如何定义凭据提供程序使用场景
- ios - 如何修复此代码以允许计时器在用户单击 UIAlert 时启动?
- ksqldb - 空的 KSQL 流
- sql - PostgreSQL 删除视图(如果存在)
- excel-formula - 使用拆分分隔字符串的公式获取#value
- c++ - 如何使用 C++ 接口从 AVRO 文件中读取数据?
- android - Android 发布构建期间的 bundleReleaseJsAndAssets 花费了无限时间