首页 > 解决方案 > 使用多进程并行处理标准输入行

问题描述

我有一个基本脚本,可以逐行读取 csv 文件,对每一行进行一些基本处理,并使用它来决定是否要保留该行。如果它保留它,它会进入一个缓冲区数组,然后稍后写入(如果文件不适合 RAM,则适当地处理分块,但稍后会详细介绍)。我被告知,由于 HDD 的魔力,我将从写入缓冲区中受益匪浅,而不仅仅是直接循环每一行。

我正在尝试通过在与读取该行的标准输入不同的进程上处理该行来查看是否可以加快此速度。很明显,该脚本受 i/o 速度限制,所以我希望它尽可能多地运行,而不是等待 cpu 的东西。

为了尝试这个,我设置了 2 个进程,使用管道将线路从一个移动到另一个。这似乎可行,但是比我只是在一个循环中对馈入的线路进行所有操作时慢一个数量级。

基本实现片段

outStream = open("updated.csv",'w')
outputBuffer = []
for line in sys.stdin:  
    row = line.strip()
    index = parseRow(row) # CPU-bound process

    if index not in mainList:
        outputBuffer.append(row)

    # some test to see if memory is filling up, if so dump it and reset buffer, just here to show logic atm
    if(getMem()>60):
        writeBuffer(outputBuffer, outStream)
        outputBuffer = []

# make sure last batch is written out
if outputBuffer != []:
    writeBuffer(outputBuffer, outStream)
outStream.close() 

尝试并行解决方案

def reader( lineConn, fileno ):
    for line in os.fdopen(fileno):
        lineConn.send(line)
    lineConn.send("end")

def parser( lineConn ):
    outputBuffer = []
    outStream = open("updated.csv",'w')
    while 1:
        line = lineConn.recv()
        if(line == "end"):
            break
        row = line.strip()
        index = parseRow(row)

        if index not in mainList:
            outputBuffer.append(row)

    writeBuffer(outputBuffer, outStream)

############ main
outputBuffer = []

lineRead, lineParse = Pipe()

io = sys.stdin.fileno() 
p1 = Process(target=reader, args=(lineRead,io))
p2 = Process(target=parser, args=(lineParse,)) 

p1.start()
p2.start()
p1.join()
p2.join()

我怀疑这与发送比接收慢有关,但我还在解析端设置了一个等待计时器(即如果 poll() 为假,那么就稍等一下)但即便如此,我添加发送(行)的东西它会停止,大概是因为我对它的工作原理不够了解。

我有一个宏伟的计划,以确保在需要完成写入时暂停读取,即文件不适合 RAM,但在此之前就卡住了。

顺便说一句,解析率低的部分原因是因为我需要查找 csv 的逗号部分(仅 2 col)来挖掘索引,这意味着读取该行以获取该行然后再次读取它找到逗号并抓住它之前的整数 - 有没有办法让标准输入处理其中的一些?即阅读该行并返回您找到的第一个逗号的位置?

重要编辑 实际上刚刚意识到任务需要将输出发送到标准输出而不是直接写入文件,所以这是否意味着我不需要缓冲,因为 HDD i/o 仅在一端?

标签: pythonmultiprocessing

解决方案


推荐阅读