首页 > 解决方案 > Python多处理并行写入单个gzip

问题描述

TL;DR:是否有允许从多个进程并行写入单个 gzip 文件的 python 库?

细节:

我正在尝试使用 python 将一个大的压缩文件(.gz)复制到另一个压缩文件(.gz)。我将对代码示例中不存在的数据执行中间处理。我希望能够使用带锁的多处理从多个进程并行写入新的 gzip,但是在输出 gz 文件上出现无效格式错误。

我认为这是因为锁不足以支持并行写入 gzip。由于压缩数据需要“了解”之前的数据才能正确输入存档,我不认为 python 不能默认处理这个问题。我猜想每个进程都保持自己对 gzip 输出的认识,并且这种状态在第一次写入后会发生分歧。

如果我在脚本中打开目标文件而不使用 gzip,那么这一切都有效。我也可以写入多个 gzip 并将它们合并,但如果可能的话,我更愿意避免这种情况。

这是我的源代码:

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue, Lock

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break
        
def writer(outfile, data_queue, coordinator_queue, write_lock, ID):
    print("Writer Started.")
    while True:
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            outfile.flush() 
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            print("Writer",ID,"-","Write Lock:",write_lock)
            write_lock.acquire()
            print("Writer",ID,"-","Write Lock:",write_lock)
            for line in queue_message:
                print("Line write:",line)
                outfile.write(line)
            write_lock.release()
            print("Writer",ID,"-","Write Lock:",write_lock)

def coordinator(reader_procs, writer_procs, coordinator_queue, data_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    writer_procs=2
    chunk_size=1
    queue_size=96
    data_queue = Queue(queue_size)
    coordinator_queue=Queue()
    write_lock=Lock()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile = gzip.open(outfile_path, 'wt')
    #Works when it is uncompressed
    #outfile=open(outfile_path, 'w')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    writers = [Process(target=writer, args=(outfile, data_queue, coordinator_queue, write_lock, x)) for x in range(writer_procs)]   
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, coordinator_queue, data_queue))

    coordinator_p.start()
    for process in readers:
        process.start()
    for process in writers:
        process.start()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

关于代码的注释:

我猜想我需要一个可以以某种方式协调不同进程之间的压缩写入的库。显然,这说明使用单个进程来执行写入(如协调进程),但这可能会引入瓶颈。

堆栈上有一些相关的帖子,但似乎没有一个专门针对我正在尝试做的事情。我还看到诸如“ mgzip ”、“ pigz ”和“ migz ”之类的实用程序可以并行压缩,但我不相信它们适用于这个用例。mgzip 在我的测试中不起作用(0 大小的文件), pigz 似乎消耗了整个文件作为命令行上的输入,而 migz 是一个 java 库,所以我不确定如何将它集成到 python 中。

如果它不能完成,那就这样吧,但任何答案将不胜感激!

-------- 更新和工作代码:

在 Mark Adler 的帮助下,我能够创建一个并行压缩数据的多处理脚本,并有一个编写器进程将其添加到目标 gz 文件中。借助现代 NVME 驱动器上的吞吐量,这降低了在成为 I/O 限制之前受压缩限制的 CPU 的可能性。

要使此代码正常工作,需要进行的最大更改如下:

值得注意的是,这不会并行写入文件,而是并行处理压缩。无论如何,压缩是这个过程中最繁重的部分。

更新的代码(经过测试并按原样工作):

#python3.8
import gzip
from itertools import islice
from multiprocessing import Process, Queue

def reader(infile, data_queue, coordinator_queue, chunk_size):
    print("Reader Started.")
    while True:
        data_chunk = list(islice(infile, chunk_size))
        data_queue.put(data_chunk)
        coordinator_queue.put('CHUNK_READ')
        if not data_chunk:
            coordinator_queue.put('READ_DONE')
            #Process exit
            break

def compressor(data_queue, compressed_queue, coordinator_queue):
    print("Compressor Started.")
    while True:
        chunk = ''
        queue_message = data_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('COMPRESS_DONE')
            #Process exit
            break
        else:
            for line in queue_message:
                #Assemble concatenated string from list
                chunk += line
            #Encode the string as binary so that it can be compressed
            #Setting gzip compression level to 9 (highest)
            compressed_chunk=gzip.compress(bytes(chunk,'utf-8'),compresslevel=9)            
            compressed_queue.put(compressed_chunk)

def writer(outfile, compressed_queue, coordinator_queue):
    print("Writer Started.")
    while True:
        queue_message = compressed_queue.get()
        if (queue_message == 'DONE'):
            #Notify coordinator process of task completion      
            coordinator_queue.put('WRITE_DONE')
            #Process exit
            break
        else:
            outfile.write(queue_message)

def coordinator(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue):
    print("Coordinator Started.")
    active_readers=reader_procs
    active_compressors=compressor_procs
    active_writers=writer_procs
    while True:
        queue_message = coordinator_queue.get()
        if queue_message=='READ_DONE':
            active_readers = active_readers-1
            if active_readers == 0:
                while not data_queue.qsize() == 0:
                    continue
                [data_queue.put('DONE') for x in range(compressor_procs)]
        if queue_message=='COMPRESS_DONE':
            active_compressors = active_compressors-1
            if active_compressors == 0:
                while not compressed_queue.qsize() == 0:
                    continue
                [compressed_queue.put('DONE') for x in range(writer_procs)]
        if queue_message=='WRITE_DONE':
            active_writers = active_writers-1
            if active_writers == 0:
                break

def main():
    reader_procs=1
    compressor_procs=2
    #writer_procs really needs to stay as 1 since writing must be done serially
    #This could probably be written out...
    writer_procs=1
    chunk_size=600
    queue_size=96
    data_queue = Queue(queue_size)
    compressed_queue=Queue(queue_size)
    coordinator_queue=Queue()
    infile_path='/directory/input_records.json.gz'
    infile = gzip.open(infile_path, 'rt')
    outfile_path='/directory/output_records.json.gz'
    outfile=open(outfile_path, 'wb')
    readers = [Process(target=reader, args=(infile, data_queue, coordinator_queue, chunk_size)) for x in range(reader_procs)]
    compressors = [Process(target=compressor, args=(data_queue, compressed_queue, coordinator_queue)) for x in range(compressor_procs)]
    writers = [Process(target=writer, args=(outfile, compressed_queue, coordinator_queue)) for x in range(writer_procs)]
    coordinator_p = Process(target=coordinator, args=(reader_procs, writer_procs, compressor_procs, coordinator_queue, data_queue, compressed_queue))
    coordinator_p.start()
    for process in readers:
        process.start()
    for process in compressors:
        process.start()     
    for process in writers:
        process.start()
    for process in compressors:
        process.join()
    for process in readers:
        process.join()
    for process in writers:
        process.join()
    coordinator_p.join()
    outfile.flush()
    outfile.close()

main()

标签: pythonmultiprocessingcompressiongzipwrite

解决方案


通过将每个线程的完整 gzip 流写入单个输出文件,这实际上非常简单。是的,您将需要一个线程来完成所有的写入工作,每个压缩线程轮流写入其所有gzip 流,然后另一个压缩线程开始写入任何内容。压缩线程都可以并行进行压缩,但写入需要序列化。

这样做的原因是 gzip 标准RFC 1952说 gzip 文件由一系列成员组成,其中每个成员是 gzip 标头、压缩数据和 gzip 预告片。


推荐阅读