首页 > 解决方案 > 从单个文件读取和写入多个压缩帧

问题描述

我有非常大的字节数据存储在 txt 文件(每个大小约为 1TB,(4 个深度相机同时运行 24-7 个)以大约每分钟 4GB 的速度写入硬盘驱动器)。

每个文件都包含大量以字节编码的图像帧。每帧都使用 lz4 压缩进行压缩,并写入相应的摄像机文件名,虽然只有 4 个文件,但对于更长的记录会增加。

当只有数据时,每个帧的大小都是未知的。

每个帧由一个未压缩的字节数组模式分隔,该模式连接到已压缩帧的末尾。像这样:

                map_to_sort.sort()
                frame_ender_bytes = bytearray("<FRAME_END>", 'utf-8')
                for k, frame_compressed in map_to_sort.mapper.items():
                    with lz4.frm.open(frame_compressed[0], 'ab') as fp:
                        fp.write(frame_compressed[1] + frame_ender_bytes)

我正在尝试读取每一帧,解压缩它,并将未压缩的帧写入另一个 12TB 硬盘驱动器(在 18 小时内,或者最好更少,(比将数据写入和压缩到文件所需的时间更少)。我是不幸的是,这个项目无法使用大容量 SSD,所以我必须了解如何管理读取和写入压缩数据到硬盘的瓶颈。

关于如何解决它,我提出了几种不同的想法,但是它们似乎都没有那么好。当然,我尝试过一些幼稚的方法来逐字节读取文件(太长而无法读取)。我也试过:

以大约每帧大小的块读取文件并使用拆分功能进行检查,如下所示:

import lz4.frame as frm
import numpy as np
def chunky(stream):
    buffer = bytearray()
    while True: 
        chunk = stream.read(40096)
        if chunk:
            buffer += chunk
        else:
            break
        while True:
            combine = buffer.split(b'<FRAME_END>', 1)
            if len(combine) < 2:
                break
            else:
                buffer = combine[1]
                part = combine[0]
                yield part

#................................................
# Elsewhere in code
# dec_path is a string variable that stores the decompressed file path corresponding to the camera path file
with frm.open(camera_file_path, 'rb') as fp:
    for ch in chunky(fp):
        output_arr = ch
        decompressed_frame = frm.decompress(output_arr)
        with frm.open(dec_path, 'ab') as fj:
            fj.write(decompressed_frame)

这种方法当然有很多边缘情况((如果 <FRAME_END> 被读取分割成一个块读取怎么办......,分割功能将不起作用,它会显示一个无法识别的帧.(即使这样也不会经常发生,解压和写入的时间仍然大于18小时)

创建一个 JSON/txt 文件,其中包含类似地图的结构。每个键映射到每个帧的开始和结束位置(以字节为单位)。然后将数据存储为字典,然后利用它以适当的字节数读取每一帧。编写压缩文件:

# In this example there are no separators between each frame as I am keeping track of positions.
# map_to_sort is a dictionary organized like so 
# int k : ["E:\\Frame_Data\cam_compressed_0.txt", #compressed frame, 56000, 10000, "E:\\Frame_Data\\cam_compressed_0_mapper_data.txt"]
map_to_sort.sort()
for k, frame_compressed in map_to_sort.mapper.items():
    with frm.open(frame_compressed[0], 'ab') as fp:
         fp.write(frame_compressed[1])
    with open(frame_compressed[4], 'w') as fil:
         str_to_write = str(k) + "," + str(frame_compressed[2]) + "," + str(frame_compressed[3]) + "," + frame_compressed[0] + "," + "\n"
         fil.write(str_to_write)
map_to_sort.clear()

读写解压文件

with open(TEXT_COMPRESSED_MAPPER_PATH, 'r') as fr:
     for line in fr:
         str_arr = (line.strip()).split(",")
         file_comp_read_from = str_arr[3]
         start_byte_pos = str_arr[1]
         end_byte_pos = str_arr[2]
         with frm.open(str_arr[3], 'rb') as fp:
             fp.seek(start_byte_pos, 0)
             fp.read(end_byte_pos - start_byte_pos)
         # Same decompression code as before

然而,这样做的缺点是文本或 json 文件占用的内存(并非微不足道)和 RAM(RAM 限制)。以及帧读取不一致的事实,我会得到许多没有正确解压缩的帧。也许我不确定我是否正在收集相对于框架开始和结束位置的文件大小的真实确切位置。如果有人有任何指示也可以提供帮助。

最有可能帮助的最后一件事是将 lz4 压缩级别从 7 提高到更高的级别。较小的文件大小无疑会减少管理。(解压缩在我的 CPU 上不是那么密集)。

在压缩级别 7 时,我已经以大约 70% 的速度使用了所有 8 个内核,而在级别 8 压缩时,我开始丢失帧,因为解压缩时间太长,并且 CPU 使用率达到 98%。

很多压缩代码使用多线程、多处理的组合。代码太长,不能在这里发布,我也不一定允许发布包含它的链接(目前还不是开源的,如果你愿意,我可以发布其中的一部分)。

基本思想是让一个进程处理记录并将帧放入共享队列,另一个进程同时发生,检查队列是否为空,然后利用线程一次压缩多个帧来处理队列(帧的弹出) . 然后对未排序的帧进行排序并按顺序写入相应的文件。如果有人对如何显着提高压缩率而不会对我的 CPU 造成太大压力有任何指示,那就太好了。

我对这种设计不太满意,因为在两个进程之间使用共享队列并不是很好的做法。然而,我觉得没有什么选择,因为我使用的英特尔深度摄像头与多处理模块配合得不是特别好,所以我无法轻松地为每个摄像头设置一个进程。英特尔深度摄像头 python 模块的实现也不适合以简单的方式同时记录和流式传输。

如果您希望在我处理文件压缩和 i/o 的方式上查看更多代码,请告诉我。

随意提及有损或其他更快或

抱歉信息量很大。

TLDR: 在 python 中使用 lz4 压缩在短时间内解压大量文件,然后利用 I/0 将解压后的文件写入另一个硬盘驱动器时遇到问题。上面的一些代码和确切信息。目标是能够在 18 小时或更短的时间内解压缩所有文件并将它们写入另一个硬盘驱动器。

编辑

我能够使用解决方案中建议的两种方法获得我正在寻找的压缩级别。(更快的读写时间)。然而,这些在 6 级压缩而不是 lz4 下使用 gzip 效果最好。我看到我的大多数文件的压缩比约为 6:1。您的 cpu 的核心越多,质量越高,您将获得更好的结果。我有 4 个同步深度流,使用 AMD Ryzen 7 5800X 处理器,任何更高的压缩级别,我开始丢帧。理论上,如果你有更多的核心,你可能会得到更好的结果。我还没有测试过这个。您将遇到的主要瓶颈将是硬盘的速度。如果您有预算,我强烈建议您使用 SSD。

标签: pythonfile-iocompressionpython-multiprocessinglz4

解决方案


这是使用固定长度标头拆分文件中的 blob 的概念证明。不确定这是否可以解决您所有的问题,但可能是部分问题。

import random
import string
import tempfile
import os

def bytes_generator():
    out = []
    for i in range(random.randint(10, 20)):
        out.append(random.choice(string.ascii_lowercase))
    return ''.join(out).encode('utf-8')

INT_BYTE_SIZE = 8
def int_to_bytes(x: int) -> bytes:
    return x.to_bytes(INT_BYTE_SIZE, 'big')
def int_from_bytes(xbytes: bytes) -> int:
    return int.from_bytes(xbytes, 'big')

def run():
    filehandle = tempfile.TemporaryFile()
    blobs = []
    for i in range(10):
        data = bytes_generator()
        blobs.append(data)
        int_bytes = int_to_bytes(len(data))
        filehandle.write(int_bytes)
        filehandle.write(data)

    # Reset as if just opened
    filehandle.seek(0, os.SEEK_SET)

    # Get file legnth
    filehandle.seek(0, os.SEEK_END)
    file_length = filehandle.tell()

    # Reset for reading
    filehandle.seek(0, os.SEEK_SET)

    i = 0
    while filehandle.tell() < file_length:
        data_length = int_from_bytes(filehandle.read(INT_BYTE_SIZE))
        read_data = filehandle.read(data_length)
        assert read_data == blobs[i]
        i += 1

if __name__ == "__main__":
    run()

在 Python 3 中或多或少地从将 int 转换为字节中窃取的 int/byte 转换


推荐阅读