首页 > 解决方案 > 如何加快在python中转换二进制文件?

问题描述

我有要读取的二进制文件,然后在 python 中写入另一个文件(CSV 或 pickle)。

我有一个解决方案,但需要很长时间。

二进制文件由几个数据集组成。数据集有 1 个标头(4 字节)、1 个序列数据(4 字节)、100 条消息(ID:2 字节,数据:8 字节)。

aa aa aa aa c8 05 00 00 51 02 15 04 ca 8c 00 10
28 80 94 03 00 20 00 00 ff 83 23 98 b0 02 a2 ff
00 07 5a 75 00 00 11 01 00 80 00 ff 4f 2c 0d 84
12 01 ff 50 00 00 ff 2c 0d 00 20 02 0f a4 7e 00
00 fb 0f 12 60 02 06 11 07 30 45 c8 69 20 16 03
05 11 9a 0d 11 0e 00 7f 29 03 d6 9a 81 8c 31 28
00 10 51 02 14 04 cb 50 00 0f 08 80 b0 02 a2 ff
00 07 4b a5 00 00 11 01 00 80 00 ff 4f 25 0d b8
12 01 ff a0 00 00 ff 25 0d 00 20 02 12 c4 7e 00 ...

这是我拥有的文件的一个示例。

为了解析二进制文件,我这样编码。

def parse(self, bindata):
    msg_list = defaultdict(list)

    
    with memoryview(bindata) as mv:
        old_seq = None

        while mv:
            # header
            header = mv[:HEADER_SIZE].tobytes()
            mv = mv[HEADER_SIZE:]
            if (header != HEADER):
                logging.error("invalid header")
                break

            # seq
            seq = int.from_bytes(
                mv[:SEQUENCE_SIZE].tobytes(), byteorder='little')
            mv = mv[SEQUENCE_SIZE:]
            if (old_seq and seq - old_seq != 1):
                logging.warning(
                    "sequence error. old=%d / current=%d", old_seq, seq)
            old_seq = seq
            

            # msg
            for msg_cnt in range(0, MSG_COUNT):
                if not mv or mv[:HEADER_SIZE] == HEADER:
                    break


                id = int.from_bytes(
                    mv[:MSG_ID_SIZE].tobytes(), byteorder='little')
                mv = mv[MSG_ID_SIZE:]
                
                msg = None
                try:
                    msg = self.__db.get_message_by_frame_id(id)
                except KeyError as e:
                    logging.exception("unknown can id. %s", hex(id))
                    mv = mv[MSG_DATA_SIZE:]
                    continue

                body = mv[:MSG_DATA_SIZE].tobytes()
                mv = mv[MSG_DATA_SIZE:]

                try:
                    decoded_msg = msg.decode(
                        body, decode_choices=False)
                    for k, v in decoded_msg.items():
                        msg_list[k].append(v)

                except Exception as e:
                    logging.exception("unpack error. %s", str(e))
    return msg_list

这段代码似乎很耗时,因为它按顺序访问数据。

所以我想知道另一种方法。

我能得到更好的推荐吗?

标签: pythonperformanceparsingbinaryfiles

解决方案


你需要这个struct模块。那么,每个块是 1008 字节吗?

    for i in range(0,len(mv),1008):
        hdr,seq = struct.unpack('II', mv[i:i+8] )
        for msg in range( 8, 1008, 100 ):
            id = mv[i+msg] * 256 + mv[i+msg+1]
            code = mv[i+msg+2:i+msg+10]

推荐阅读