python-3.x - 需要通过从 gzip 文件中读取,在 Python 中使用 2+ 个进程进行 CPU 绑定处理
问题描述
我有一个 gzip 文件跨越(压缩 10GB,未压缩 100GB),其中有一些报告由分界线分隔,我必须解析它。解析和处理数据需要很长时间,因此是 CPU 绑定问题(不是 IO 绑定问题)。所以我打算使用multiprocessing
模块将工作分成多个流程。问题是我无法有效地向子进程发送/共享数据。我正在使用subprocess.Popen
流式传输父进程中的未压缩数据。
process = subprocess.Popen('gunzip --keep --stdout big-file.gz',
shell=True,
stdout=subprocess.PIPE)
我正在考虑使用 Lock() 读取/解析 child-process-1 中的一个报告,然后释放锁,然后切换到 child-process-2 以读取/解析下一个报告,然后切换回 child-process- 1 读取/解析下一个报告)。当我process.stdout
与子进程共享 as args 时,出现酸洗错误。
我试图创建数据multiprocessing.Queue()
并将multiprocessing.Pipe()
数据发送到子进程,但这太慢了(实际上它比在单线程中执行它要慢得多,即串行)。
有关有效地将数据发送到子进程的任何想法/示例都会有所帮助。
解决方案
你可以尝试一些简单的东西吗?让每个工作进程运行自己的 实例gunzip
,根本没有进程间通信。Worker 1 可以处理第一个报告,而跳过第二个报告。工人 2 则相反。每个工人都跳过其他报告。N
然后是对工人的明显概括。
或不 ...
我认为您需要更具体地说明您的尝试,并可能提供有关您的问题的更多信息(例如:有多少条记录?它们有多大?)。
这是一个程序(“genints.py”),它打印一堆随机整数,每行一个,通过“xxxxx\n”分隔线分成组:
from random import randrange, seed
seed(42)
for i in range(1000):
for j in range(randrange(1, 1000)):
print(randrange(100))
print("xxxxx")
因为它强制种子,所以每次都会生成相同的东西。现在是一个程序,可以通过我最初想到的最明显的方式并行和串行地处理这些组。 crunch()
花费时间与组中的整数数量成二次方,因此它非常受 CPU 限制。一次运行的输出,并行部分使用(如图所示)3 个工作进程:
parallel result: 10,901,000,334 0:00:35.559782
serial result: 10,901,000,334 0:01:38.719993
所以并行化运行大约需要三分之一的时间。这与您的问题有什么不同?当然,“genints.py”的完整运行产生不到 200 万字节的输出,所以这是一个主要区别 - 但从这里无法猜测这是否是相关的区别。Perahps,例如,您的问题只是非常轻微地受 CPU 限制?从这里的输出可以明显看出,将标准输出块传递给工作进程的开销在这个程序中几乎是微不足道的。
简而言之,你可能需要给人们——就像我刚刚为你做的那样——一个他们可以运行的完整程序来重现你的问题。
import multiprocessing as mp
NWORKERS = 3
DELIM = "xxxxx\n"
def runjob():
import subprocess
# 'py' is just a shell script on my box that
# invokes the desired version of Python -
# which happened to be 3.8.5 for this run.
p = subprocess.Popen("py genints.py",
shell=True,
text=True,
stdout=subprocess.PIPE)
return p.stdout
# Return list of lines up to (but not including) next DELIM,
# or EOF. If the file is already exhausted, return None.
def getrecord(f):
result = []
foundone = False
for line in f:
foundone = True
if line == DELIM:
break
result.append(line)
return result if foundone else None
def crunch(rec):
total = 0
for a in rec:
for b in rec:
total += abs(int(a) - int(b))
return total
if __name__ == "__main__":
import datetime
now = datetime.datetime.now
s = now()
total = 0
f = runjob()
with mp.Pool(NWORKERS) as pool:
for i in pool.imap_unordered(crunch,
iter((lambda: getrecord(f)), None)):
total += i
f.close()
print(f"parallel result: {total:,}", now() - s)
s = now()
# try the same thing serially
total = 0
f = runjob()
while True:
rec = getrecord(f)
if rec is None:
break
total += crunch(rec)
f.close()
print(f"serial result: {total:,}", now() - s)
推荐阅读
- karate - 如何在 parm 请求中传递一个字符串并在我的请求路径中循环它而不保存到文件和创建表
- swift - 发送 JSON 数组作为参数 Alamofire
- ajax - Jaxon 不会在响应中返回任何内容
- go - Golang 在处理函数之外获取请求
- json - 如何使用python删除json中不需要的分隔符
- flutter - flutter schedule功能本地通知不起作用,有谁知道可能是什么?
- matlab - 为什么变量在Matlab中不取负值?
- apache-kafka - 为什么我的 Kafka 连接接收器集群只有一个工作人员处理消息?
- java - 在 when().thenReturn() 返回的对象上调用方法
- ruby-on-rails - 浏览器中的 NoMethodError 但在控制台中工作