首页 > 解决方案 > 我正在尝试在 python 中使用多处理进行压缩

问题描述

这就是我想要做的-

import lz4.frame
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())
chunk_size = 64*1024
#screen is a pygame.Surface object converted to bytes

with lz4.frame.LZ4FrameCompressor(compression_level = 10, auto_flush = True) as compressor:
                compressed = pool.map(compressor.compress, [screen[i : i + chunk_size] for i in range(0, len(screen), chunk_size)])
                compressed = compressor.begin() + b''.join(compressed)
                compressed += compressor.flush()
pool.close()

当我使用 map 而不是 pool.map 时,这工作得很好......使用 pool.map,什么也没有发生。甚至没有错误...

另外,压缩函数中 block_size 参数的意义何在?我尝试了参数 block_size(4 和 5)和 chunk_size(64k 和 256k)的不同组合,但似乎没有太大区别。

标签: pythonmultiprocessingcompression

解决方案


如果它不是 fork 安全的,那么在上下文中进行多处理可能会很危险。

在文档(https://python-lz4.readthedocs.io/en/stable/intro.html)中,我读到:

绑定在调用底层 LZ4 库时会删除 GIL,并且是线程安全的。

您是否尝试过多线程?

import lz4.frame
from concurrent.futures import ThreadPoolExecutor
import os

nb_chunk = 1024
chunk_size = 64*1024
screen = os.urandom(nb_chunk*chunk_size)

context = lz4.frame.create_compression_context()
chunks = [screen[i : i + chunk_size] for i in range(0, len(screen), chunk_size)]
contexts = [context] * len(chunks)
compressed = lz4.frame.compress_begin(context)
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
    compressed_chunks = executor.map(lz4.frame.compress_chunk, contexts, chunks)
compressed += b''.join(compressed_chunks)
compressed = lz4.frame.compress_flush(context)

推荐阅读