python - 我正在尝试在 python 中使用多处理进行压缩
问题描述
这就是我想要做的-
import lz4.frame
import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
chunk_size = 64*1024
#screen is a pygame.Surface object converted to bytes
with lz4.frame.LZ4FrameCompressor(compression_level = 10, auto_flush = True) as compressor:
compressed = pool.map(compressor.compress, [screen[i : i + chunk_size] for i in range(0, len(screen), chunk_size)])
compressed = compressor.begin() + b''.join(compressed)
compressed += compressor.flush()
pool.close()
当我使用 map 而不是 pool.map 时,这工作得很好......使用 pool.map,什么也没有发生。甚至没有错误...
另外,压缩函数中 block_size 参数的意义何在?我尝试了参数 block_size(4 和 5)和 chunk_size(64k 和 256k)的不同组合,但似乎没有太大区别。
解决方案
如果它不是 fork 安全的,那么在上下文中进行多处理可能会很危险。
在文档(https://python-lz4.readthedocs.io/en/stable/intro.html)中,我读到:
绑定在调用底层 LZ4 库时会删除 GIL,并且是线程安全的。
您是否尝试过多线程?
import lz4.frame
from concurrent.futures import ThreadPoolExecutor
import os
nb_chunk = 1024
chunk_size = 64*1024
screen = os.urandom(nb_chunk*chunk_size)
context = lz4.frame.create_compression_context()
chunks = [screen[i : i + chunk_size] for i in range(0, len(screen), chunk_size)]
contexts = [context] * len(chunks)
compressed = lz4.frame.compress_begin(context)
with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor:
compressed_chunks = executor.map(lz4.frame.compress_chunk, contexts, chunks)
compressed += b''.join(compressed_chunks)
compressed = lz4.frame.compress_flush(context)
推荐阅读
- django - Django REST Serializer 使用错误的模型进行序列化
- python - python文件写入程序运行时如何更新桌面上的文件大小
- javascript - 使用 d3.js 更新表数据
- c# - C#捕获从不在进程中的函数返回的异常?
- r - 如何设置仅在输入 3 时才显示集合向量的函数?
- javascript - 如果 URI 没有改变,例如在单页应用程序上,如何检测用户是否在新页面上?
- angular - Angular Kendo UI 全局访问
- php - 内连接循环通过
- git - 如何 git rebase 从另一个分支直接到 master 分支?
- javascript - 受控数字比例映射