首页 > 解决方案 > 多处理池比手动实例化多个进程慢得多

问题描述

我正在从一个大文件中读取一个块,将其作为行列表加载到内存中,然后在每一行上处理一个任务。

顺序解决方案花费的时间太长,所以我开始研究如何并行化它。

我想出的第一个解决方案是使用 Process 并管理列表中的每个子进程部分。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    for piece in read_in_chunks(file, CHUNKSIZE):
        jobs = []
        piece_list = piece.splitlines()
        piece_list_len = len(piece_list)
        item_delta = round(piece_list_len/N_PROCESSES)
        start = 0
        for process in range(N_PROCESSES):
            finish = start + item_delta
            p = mp.Process(target=work, args=(piece_list[start:finish]))
            start = finish
            jobs.append(p)
            p.start()
        for job in jobs:
            job.join()

它在大约 2498 毫秒内完成每个块。

然后我发现了 Pool 工具来自动管理切片。

import multiprocessing as mp

BIG_FILE_PATH = 'big_file.txt'
CHUNKSIZE = '1000000'
N_PROCESSES = mp.cpu_count()


def read_in_chunks(file_object, chunk_size=1024):
    while True:
        data = file_object.read(chunk_size)
        if not data:
            break
        yield data


with open(BIG_FILE_PATH, encoding="Latin-1") as file:
    with mp.Pool(N_PROCESSES) as pool:
        for piece in read_in_chunks(file, CHUNKSIZE):
            piece_list = piece.splitlines()
            pool.map(work, piece_list)

它在大约 15540 毫秒内完成每个块,比手动慢 6 倍,但仍比顺序快。

我用错了泳池吗?有没有更好或更快的方法来做到这一点?

感谢您的阅读。

更新

正如 Hannu 建议的那样,游泳池的开销很大。

Process 方法调用的工作函数需要一个行列表。

由于 Pool 决定切片的方式,由 Pool 方法调用的工作函数需要一行。

我不太确定如何让池一次给某个工人多条线路。

那应该能解决问题吗?

更新 2

最后一个问题,有没有第三种更好的方法呢?

标签: pythonperformancemultiprocessingpython-multiprocessingpool

解决方案


我对此并不完全确定,但在我看来,您的程序在提交给工人的内容上有很大的不同。

在您的 Process 方法中,您似乎提交了大量行:

p = mp.Process(target=work, args=(piece_list[start:finish]))

但是当你使用 Pool 时,你会这样做:

for piece in read_in_chunks(file, CHUNKSIZE):
    piece_list = piece.splitlines()
    pool.map(work, piece_list)

您以块的形式读取文件,但是当您使用 时splitlines,您的piece_list可迭代对象会提交一个单位。

这意味着在您的流程方法中,您提交与 CPU 一样多的子任务,但在您的池方法中,您提交与源数据行数一样多的任务。如果你有很多行,这将在你的池中产生大量的编排开销,因为每个工作人员一次只处理一行,然后完成,返回结果,然后池将另一行提交给新释放的工作人员。

如果这就是这里发生的事情,它肯定解释了为什么 Pool 需要更长的时间才能完成。

如果您将阅读器用作可迭代对象并跳过行拆分部分会发生什么:

pool.map(work, read_in_chunks(file, CHUNKSIZE))

推荐阅读