首页 > 解决方案 > 在 Python 中使用多处理高效地创建 1% 的样本

问题描述

我正在尝试使用多处理逐行处理大型数据集(300GB)。我想定义一个基于一个变量的 1% 随机样本。我的第一步是定义样本,然后我想使用多处理读取数据文件。我猜如果我定义用于随机样本的集合的部分没有为每个孩子运行,那么脚本将运行得更快。但是,当我尝试将脚本的那部分移动到if __name__ == "__main__":子程序似乎不再识别来自父级的随机样本时。我得到错误:

NameError:名称“id_pct1”未定义

将我定义随机样本的脚本部分放在哪里最有效?

#define sample
uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
pct1 = round(len(uid)/100)
random.seed(1)
id_pct1 = set(random.sample(uid, k=pct1))
id_pct1.add(vname)

#read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
    with open('myfile.txt') as f:
        tlines = []
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            data = line.split('*')
            if data[30] in id_pct1: tlines.append(line)
        q.put(tlines)
        return tlines

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname, 'r') as f:
        chunkEnd2 = 0
        while True:
            chunkStart = chunkEnd2
            f.seek(chunkStart)
            f.read(size)
            chunkEnd1 = f.tell()
            f.readline()
            chunkEnd2 = f.tell()
            chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
            yield chunkStart, chunkSz
            if chunkEnd2 >= fileEnd:
                break

def listener(q):
    with open('myfile1pct.txt', 'w') as out_f1:
        while True:
            m = q.get()
            if m == 'kill': break
            else:
                for line in m:
                    out_f1.write(line+'\n')
                    out_f1.flush()

def main():

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool()

    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for chunkStart, chunkSize in chunkify('myfile.txt'):
        jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))

    for job in jobs:
        job.get()

    q.put('kill')
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

标签: pythonmultiprocessing

解决方案


如果您希望这些项目在#define整个程序中都可用,您应该使用global它前面的关键字。但是,声明全局变量通常被认为是不好的做法。您应该考虑将您的#define逻辑合并到您的函数中,如下所示:

 #read original file and write 1% sample using multiprocessing
def worker(chunkStart, chunkSize, q):
    #define sample
    uid = list(line.strip() for line in open('Subsets/unique_ids_final.txt'))
    pct1 = round(len(uid)/100)
    random.seed(1)
    id_pct1 = set(random.sample(uid, k=pct1))
    id_pct1.add(vname)

    with open('myfile.txt') as f:
        tlines = []
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            data = line.split('*')
            if data[30] in id_pct1: tlines.append(line)
        q.put(tlines)
        return tlines

def chunkify(fname,size=1024*1024):
    fileEnd = os.path.getsize(fname)
    with open(fname, 'r') as f:
        chunkEnd2 = 0
        while True:
            chunkStart = chunkEnd2
            f.seek(chunkStart)
            f.read(size)
            chunkEnd1 = f.tell()
            f.readline()
            chunkEnd2 = f.tell()
            chunkSz = 1024*1024 + chunkEnd2 - chunkEnd1 - 1
            yield chunkStart, chunkSz
            if chunkEnd2 >= fileEnd:
                break

def listener(q):
    with open('myfile1pct.txt', 'w') as out_f1:
        while True:
            m = q.get()
            if m == 'kill': break
            else:
                for line in m:
                    out_f1.write(line+'\n')
                    out_f1.flush()

def main():

    manager = mp.Manager()
    q = manager.Queue()
    pool = mp.Pool()

    watcher = pool.apply_async(listener, (q,))

    jobs = []
    for chunkStart, chunkSize in chunkify('myfile.txt'):
        jobs.append(pool.apply_async(worker,(chunkStart,chunkSize,q)))

    for job in jobs:
        job.get()

    q.put('kill')
    pool.close()
    pool.join()

if __name__ == '__main__':
    main()

推荐阅读