首页 > 技术文章 > 使用python,以字段来分割超大文本

love-DanDan 2020-11-07 22:56 原文

本次拿到一个大小为150G+的用户数据文件,要求分割成小文件,以便于快速转换为本地字段。

思路如下:

查看文件实质上是否为文本文件(是)

查看文件结构(xml)

查看文件内容中如何标记一个用户数据的起始与结束(object)

以分割份数为指标,先均分,然后seek到指定位置,向后读取每行,直到匹配到符合条件(用户数据结束)的文件指针位置。查找每一份子文件相对于母文件的文件指针位置(使用seek和tell、readline、re模块的findall进行匹配)

记录到每份子文件(起始)相对于母文件的指针位置,根据分割份数,创建进程(虽然是IO密集操作,但是担心线程造成的锁反而会降低效率,所以开进程来进行资源独享,虽然都是打开同一份母文件,但是读取位置互不干扰)进行二进制读取写入到子文件中。

实测几乎可以跑满硬盘IO,由于本人使用内存创建的硬盘来进行分割操作,读写IO到达1.5G每秒。

若是分割的子文件数量过多(上百万乃至千万级别,且子文件不大,几十KB,请不要读写都在一个盘中,会严重减低效率)

顺带一提,若是磁盘性能够强,多进程还是单进程也差不了多少。

 1 import os
 2 import re
 3 import time
 4 
 5 def segmentation(ChildfileName,mainFd,startOffset,endOffset):
 6     '''
 7     mainFd:为被分割的文件的文件描述符,即用open函数打开的文件;
 8     ChildfileName:指定子文件的名称(路径)
 9     startOffset:此次分割的起始偏移量(针对主文件)
10     endOffset:此次分割的结束偏移量(针对主文件)
11     '''
12     with open(ChildfileName,'wb') as childFile:
13         readBlock = 10240;                                                      #每次读取的字节块大小
14         mainFd.seek(startOffset,0);
15         #print("偏移量:",startOffset);
16         interval = endOffset - mainFd.tell();
17         flag = True;        #是否写入文件
18         if flag:
19             while interval >= readBlock:
20                 childFile.write(mainFd.read(readBlock));
21                 interval = endOffset - mainFd.tell();
22         else:
23             print("开头:")
24             print(mainFd.read(1024));
25             while interval >= readBlock:
26                 a = mainFd.read(10240)
27                 interval = endOffset - mainFd.tell();
28         if interval > 0:
29             tail = mainFd.read(interval);
30             childFile.write(tail)
31             print("结尾:")
32             print(tail);
33         else:
34             print("写入文件的数据大小有误");
35     #print("endOffset:{}\tstartOffset:{}".format(endOffset,startOffset))
36 
37 filePath = r'z:\allcd.xml.res'
38 allcd_size = os.stat(filePath).st_size;         #获取文件的大小,字节数
39 childFileNumber = 5;                            #分割成几份
40 startOffset = 0;                                #起始偏移量
41 
42 endOffset = int(allcd_size / childFileNumber);  #结束偏移量
43 targetPath = r'z:\temp\\'                       #输出路径
44 print("文件大小:",str(allcd_size)," 字节。");
45 
46 for i in range(childFileNumber):
47     allcd = open(filePath,'rt');
48     allcd.seek(endOffset,0);
49     if i < childFileNumber -1:
50         while True:
51             str = allcd.readline();
52             if len(re.findall(r'^</objects>',str)) != 0:
53                 #print (str);
54                 endOffset =  allcd.tell() - len(str) + len("</objects>");
55                 break;
56     allcd.close();
57     allcd = open(filePath,'rb');
58     
59     print("startOffset:{};endOffset:{}".format(startOffset,endOffset));
60     segmentation(targetPath + repr(i+1) + '.xml.res' ,allcd, startOffset,endOffset);
61  
62     
63     #重新计算偏移量
64     startOffset = endOffset;
65     endOffset = int(allcd_size / childFileNumber *(i + 2));
66     allcd.close();
67 
68 print("程序执行花费时间:{}".format(time.process_time()));

多进程版本,区别不大

import os
import re
from multiprocessing import Process
import time


def segmentation(ChildfileName,mainFliePath,startOffset,endOffset):
    '''
    mainFliePath:为被分割的文件的路径,
    ChildfileName:指定子文件的名称(路径)
    startOffset:此次分割的起始偏移量(针对主文件)
    endOffset:此次分割的结束偏移量(针对主文件)
    '''
    print(ChildfileName)
    mainFd = open(mainFliePath,'rb');
    with open(ChildfileName,'wb') as childFile:
        readBlock = 10240;                      #每次读取的字节块大小
        mainFd.seek(startOffset,0);
        interval = endOffset - mainFd.tell();

        while interval >= readBlock:
            childFile.write(mainFd.read(readBlock));
            interval = endOffset - mainFd.tell();
        if interval > 0:
            tail = mainFd.read(interval);
            childFile.write(tail)
            #print("结尾:")
            #print(tail);
        else:
            print("写入文件的数据大小有误");
    #print("endOffset:{}\tstartOffset:{}".format(endOffset,startOffset))
    mainFd.close();
if __name__ == '__main__':
    filePath = r'z:\allcd.xml.res'
    allcd_size = os.stat(filePath).st_size;         #获取文件的大小,字节数
    childFileNumber = 5;                            #分割成几份
    startOffset = 0;                                #起始偏移量
    ProcessDict = {};                               #进程对象字典,以先后顺序为键值
    endOffset = int(allcd_size / childFileNumber);  #结束偏移量
    targetPath = r'z:\temp'                         #输出路径
    print("文件大小:",str(allcd_size)," 字节。");

    for i in range(childFileNumber):
        allcd = open(filePath,'rt');
        allcd.seek(endOffset,0);
        if i < childFileNumber -1:
            while True:
                str = allcd.readline();
                if len(re.findall(r'^</objects>',str)) != 0:
                    #print (str);
                    endOffset =  allcd.tell() - len(str) + len("</objects>");
                    break;
        allcd.close();
        #allcd = open(filePath,'rb');
        
        print("startOffset:{};endOffset:{}".format(startOffset,endOffset));
        
        ChildfileName=targetPath +"\\" + repr(i+1) + r'.xml.res'
        ProcessDict[i] = Process(target=segmentation,args=(ChildfileName ,filePath, startOffset,endOffset,));
        ProcessDict[i].start();
        #segmentation(ChildfileName,filePath, startOffset,endOffset);
        
        #重新计算偏移量
        startOffset = endOffset;
        endOffset = int(allcd_size / childFileNumber *(i + 2));
        #allcd.close();
    #激活各个子进程
    for key in ProcessDict.keys():
        print(ProcessDict[key])
        #ProcessDict[key].start();
        
    #检查子进程是否结束
    while True:
        endNumber = 0;
        for key in ProcessDict.keys():
            if ProcessDict[key].is_alive():
                print("{}号进程正在执行。".format(key));
            else:
                print("{}号进程已结束。".format(key));
                endNumber += 1;
        if endNumber == childFileNumber:
            print("所有进程执行均完毕");
            break;
        else:
            time.sleep(5);

    #print("程序执行花费时间:{}".format(time.process_time()));

 

推荐阅读