python - Python多处理并行写入单个gzip
问题描述
TL;DR:是否有允许从多个进程并行写入单个 gzip 文件的 python 库?
细节:
我正在尝试使用 python 将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带锁的多处理从多个进程并行写入新的 gzip,但是在输出 gz 文件上出现无效格式错误。
我认为这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据才能正确输入存档,我不认为 python 不能默认处理这个问题。我猜想每个进程都保持自己对 gzip 输出的认识,并且这种状态在第一次写入后会发生分歧。
如果我在脚本中打开目标文件而不使用 gzip,那么这一切都有效。我也可以写入多个 gzip 并将它们合并,但如果可能的话,我更愿意避免这种情况。
这是我的源代码:
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
print("Writer Started.")
while True:
queue_message = data_queue.get()
if (queue_message == 'DONE'):
outfile.flush()
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
print("Writer",ID,"-","Write Lock:",write_lock)
write_lock.acquire()
print("Writer",ID,"-","Write Lock:",write_lock)
for line in queue_message:
print("Line write:",line)
outfile.write(line)
write_lock.release()
print("Writer",ID,"-","Write Lock:",write_lock)
def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
writer_procs=2
chunk_size=1
queue_size=96
data_queue = Queue(queue_size)
coordinator_queue=Queue()
write_lock=Lock()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile = gzip.open(outfile_path, 'wt')
#Works when it is uncompressed
#outfile=open(outfile_path, 'w')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in writers:
process.start()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
关于代码的注释:
- “块大小”决定了从输入文件中提取的行数
- 在尝试完成这项工作时,我一直在使用一个小得多的测试文件
- 未压缩时我的输入文件超过 200gb
- 未压缩时我的输出文件将超过 200gb
- 此版本的代码已被修剪,可能存在一些错误,但它直接基于我正在运行的内容。
- 脚本的所有功能区域均已保留。
我猜想我需要一个可以以某种方式协调不同进程之间的压缩写入的库。显然,这说明使用单个进程来执行写入(如协调进程),但这可能会引入瓶颈。
堆栈上有一些相关的帖子,但似乎没有一个专门针对我正在尝试做的事情。我还看到诸如“ mgzip ”、“ pigz ”和“ migz ”之类的实用程序可以并行压缩,但我不相信它们适用于这个用例。mgzip 在我的测试中不起作用(0 大小的文件), pigz 似乎消耗了整个文件作为命令行上的输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python 中。
如果它不能完成,那就这样吧,但任何答案将不胜感激!
-------- 更新和工作代码:
在 Mark Adler 的帮助下,我能够创建一个并行压缩数据的多处理脚本,并有一个编写器进程将其添加到目标 gz 文件中。借助现代 NVME 驱动器上的吞吐量,这降低了在成为 I/O 限制之前受压缩限制的 CPU 的可能性。
要使此代码正常工作,需要进行的最大更改如下:
gzip.compress(bytes(string, 'utf-8'),compresslevel=9)
需要压缩单个“块”或“流:file = open(outfile, 'wb')
需要打开可以成为目标 gzip 的未编码二进制输出文件。- 该
file.write()
操作必须从单个进程中进行,因为它必须连续执行。
值得注意的是,这不会并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。
更新的代码(经过测试并按原样工作):
#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue
def reader(infile, data_queue, coordinator_queue, chunk_size):
print("Reader Started.")
while True:
data_chunk = list(islice(infile, chunk_size))
data_queue.put(data_chunk)
coordinator_queue.put('CHUNK_READ')
if not data_chunk:
coordinator_queue.put('READ_DONE')
#Process exit
break
def compressor(data_queue, compressed_queue, coordinator_queue):
print("Compressor Started.")
while True:
chunk = ''
queue_message = data_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('COMPRESS_DONE')
#Process exit
break
else:
for line in queue_message:
#Assemble concatenated string from list
chunk += line
#Encode the string as binary so that it can be compressed
#Setting gzip compression level to 9 (highest)
compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)
compressed_queue.put(compressed_chunk)
def writer(outfile, compressed_queue, coordinator_queue):
print("Writer Started.")
while True:
queue_message = compressed_queue.get()
if (queue_message == 'DONE'):
#Notify coordinator process of task completion
coordinator_queue.put('WRITE_DONE')
#Process exit
break
else:
outfile.write(queue_message)
def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
print("Coordinator Started.")
active_readers=reader_procs
active_compressors=compressor_procs
active_writers=writer_procs
while True:
queue_message = coordinator_queue.get()
if queue_message=='READ_DONE':
active_readers = active_readers-1
if active_readers == 0:
while not data_queue.qsize() == 0:
continue
[data_queue.put('DONE') for x in range(compressor_procs)]
if queue_message=='COMPRESS_DONE':
active_compressors = active_compressors-1
if active_compressors == 0:
while not compressed_queue.qsize() == 0:
continue
[compressed_queue.put('DONE') for x in range(writer_procs)]
if queue_message=='WRITE_DONE':
active_writers = active_writers-1
if active_writers == 0:
break
def main():
reader_procs=1
compressor_procs=2
#writer_procs really needs to stay as 1 since writing must be done serially
#This could probably be written out...
writer_procs=1
chunk_size=600
queue_size=96
data_queue = Queue(queue_size)
compressed_queue=Queue(queue_size)
coordinator_queue=Queue()
infile_path='/directory/input_records.json.gz'
infile = gzip.open(infile_path, 'rt')
outfile_path='/directory/output_records.json.gz'
outfile=open(outfile_path, 'wb')
readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
coordinator_p.start()
for process in readers:
process.start()
for process in compressors:
process.start()
for process in writers:
process.start()
for process in compressors:
process.join()
for process in readers:
process.join()
for process in writers:
process.join()
coordinator_p.join()
outfile.flush()
outfile.close()
main()
解决方案
通过将每个线程的完整 gzip 流写入单个输出文件,这实际上非常简单。是的,您将需要一个线程来完成所有的写入工作,每个压缩线程轮流写入其所有gzip 流,然后另一个压缩线程开始写入任何内容。压缩线程都可以并行进行压缩,但写入需要序列化。
这样做的原因是 gzip 标准RFC 1952说 gzip 文件由一系列成员组成,其中每个成员是 gzip 标头、压缩数据和 gzip 预告片。
推荐阅读
- javascript - Reactjs Concat 与 prev 状态
- javascript - 响应搜索 onChange 取消请求
- php - 需要 PHP 5.1.1 之前版本和 PHP 7.2.6 版本中带有反斜杠的字符串示例
- php - 构建动态超级菜单
- android - Android 中的 AWS IoT Policy 不适用于自定义主题
- python - 如何为张量流中的矩阵中的每个行向量构造成对差异的平方?
- android - 我正在尝试通过我的 android 应用程序从圆形立方体发送电子邮件,但它显示错误
- swift - 很快,为什么我不能在有初始化程序时实例化协议?
- javascript - html javascript node.children 似乎是一个指针
- typescript - 在 TypeScript 中覆盖全局 JavaScript 函数