python - 如何知道多处理(Python 模块)中的池中有多少线程/工作者已经完成?
问题描述
我正在使用 imapala shell 来计算包含表名的文本文件的一些统计信息。
我正在使用 Python 多处理模块来汇集进程。
事情是事情任务非常耗时,所以我需要跟踪完成了多少文件才能看到工作进度。
所以让我给你一些关于我正在使用的功能的想法。
job_executor
是获取表列表并执行任务的函数。
main()
是函数,它获取文件位置,没有执行程序(pool_workers),将包含表的文件转换为表列表并执行多处理操作
我想查看 job_executor 处理了多少文件等进度,但我找不到解决方案。使用计数器也不起作用。
def job_executor(text):
impala_cmd = "impala-shell -i %s -q 'compute stats %s.%s'" % (impala_node, db_name, text)
impala_cmd_res = os.system(impala_cmd) #runs impala Command
#checks for execution type(success or fail)
if impala_cmd_res == 0:
print ("invalidated the metadata.")
else:
print("error while performing the operation.")
def main(args):
text_file_path = args.text_file_path
NUM_OF_EXECUTORS = int(args.pool_executors)
with open(text_file_path, 'r') as text_file_reader:
text_file_rows = text_file_reader.read().splitlines() # this will return list of all the tables in the file.
process_pool = Pool(NUM_OF_EXECUTORS)
try:
process_pool.map(job_executor, text_file_rows)
process_pool.close()
process_pool.join()
except Exception:
process_pool.terminate()
process_pool.join()
def parse_args():
"""
function to take scraping arguments from test_hr.sh file
"""
parser = argparse.ArgumentParser(description='Main Process file that will start the process and session too.')
parser.add_argument("text_file_path",
help='provide text file path/location to be read. ') # text file fath
parser.add_argument("pool_executors",
help='please provide pool executors as an initial argument') # pool_executor path
return parser.parse_args() # returns list/tuple of all arguments.
if __name__ == "__main__":
mail_message_start()
main(parse_args())
mail_message_end()
解决方案
如果您坚持通过 进行不必要multiprocessing.pool.Pool()
的操作,那么跟踪正在发生的事情的最简单方法是使用非阻塞映射(即multiprocessing.pool.Pool.map_async()
):
def main(args):
text_file_path = args.text_file_path
NUM_OF_EXECUTORS = int(args.pool_executors)
with open(text_file_path, 'r') as text_file_reader:
text_file_rows = text_file_reader.read().splitlines()
total_processes = len(text_file_rows) # keep the number of lines for reference
process_pool = Pool(NUM_OF_EXECUTORS)
try:
print('Processing {} lines.'.format(total_processes))
processing = process_pool.map_async(job_executor, text_file_rows)
processes_left = total_processes # number of processing lines left
while not processing.ready(): # start a loop to wait for all to finish
if processes_left != processing._number_left:
processes_left = processing._number_left
print('Processed {} out of {} lines...'.format(
total_processes - processes_left, total_processes))
time.sleep(0.1) # let it breathe a little, don't forget to `import time`
print('All done!')
process_pool.close()
process_pool.join()
except Exception:
process_pool.terminate()
process_pool.join()
这将每 100 毫秒检查一些进程是否完成处理,如果自上次检查后发生更改,它将打印出到目前为止处理的行数。如果您需要更深入地了解您的子流程正在发生的事情,您可以使用一些共享结构(如multiprocessing.Queue()
或multiprocessing.Manager()
结构)直接从您的流程中报告。
推荐阅读
- swift - 每当我尝试呈现以前的视图控制器时都会出现阴影
- c++ - NodeMCU 无法让 MQTT 和 Led strip 脚本一起工作
- c# - 在 Windows 10 中使用 CommonOpenFileDialog 选择文件夹但仍显示文件夹中的文件
- mysql - 无法读取未定义节点 js 的属性“查询”
- sql-server - 如何根据参数从存储过程中运行 SQL 语句
- java - 返回的字符串值未显示,JAVA
- node.js - 'string | 类型的参数 字符串[] | 解析问题 | ParsedQs[]' 不可分配给“字符串”类型的参数
- javascript - 如何将带有链接的字符串解析为html链接
- python - 通过检查标志循环
- javascript - JQuery load() 状态没有意义