首页 > 解决方案 > 这是我能从 Python 多进程中获得的最大收益吗?

问题描述

我有数据,它在一个文本文件中。每一行都是一个计算。这个文件有大约 100 000 000 行。

首先我将所有内容加载到内存中,然后我有一个执行计算并给出以下结果的方法:

def process(data_line):
    #do computation
    return result

然后我用 2000 行的数据包这样调用它,然后将结果保存到磁盘:

POOL_SIZE = 15 #nbcore - 1
PACKET_SIZE = 2000
pool = Pool(processes=POOL_SIZE)

data_lines = util.load_data_lines(to_be_computed_filename)
number_of_packets = int(number_of_lines/ PACKET_SIZE)
for i in range(number_of_packets):
    lines_packet = data_lines[:PACKET_SIZE]
    data_lines = data_lines[PACKET_SIZE:]
    results = pool.map(process, lines_packet)
    save_computed_data_to_disk(to_be_computed_filename, results)

# process the last packet, which is smaller
results.extend(pool.map(process, data_lines))
save_computed_data_to_disk(to_be_computed_filename, results)
print("Done")

问题是,当我写入磁盘时,我的 CPU 没有计算任何东西并且有 8 个内核。它正在查看任务管理器,似乎丢失了相当多的 CPU 时间。

我必须在完成计算后写入磁盘,因为结果比输入大 1000 倍。无论如何,我必须在某个时候写入磁盘。如果时间没有在这里丢失,它会在以后丢失。

CPU时间

我能做些什么来让一个核心写入磁盘,同时仍然与其他核心一起计算?切换到 C?

按照这个速度,我可以在 75 小时内处理 1 亿行,但我有 120 亿行要处理,因此欢迎任何改进。

计时示例:

Processing packet 2/15 953 of C:/processing/drop_zone\to_be_processed_txt_files\t_to_compute_303620.txt
Launching task and waiting for it to finish...
Task completed, Continuing
Packet was processed in 11.534576654434204 seconds
We are currently going at a rate of 0.002306915330886841 sec/words
Which is 433.47928145051293 words per seconds
Saving in temporary file
Printing writing 5000 computed line to disk took 0.04400920867919922 seconds
saving word to resume from : 06 20 25 00 00
Estimated time for processing the remaining packets is : 51:19:25

标签: pythonpython-3.xmultithreading

解决方案


注意:这个 SharedMemory 仅适用于 Python >= 3.8,因为它第一次出现在那里

启动 3 种进程:Reader、Processor(s)、Writer。

让 Reader 进程以增量方式读取文件,并通过shared_memoryQueue 共享结果。

让处理器使用队列,使用 shared_memory,并通过另一个队列返回结果。同样,作为 shared_memory。

让 Writer 进程使用第二个队列,写入目标文件。

让它们都通过一些Events 或DictProxy与将充当协调器的 MainProcess 进行通信。


例子:

import time
import random
import hashlib
import multiprocessing as MP

from queue import Queue, Empty

# noinspection PyCompatibility
from multiprocessing.shared_memory import SharedMemory

from typing import Dict, List


def readerfunc(
        shm_arr: List[SharedMemory], q_out: Queue, procr_ready: Dict[str, bool]
):
    numshm = len(shm_arr)
    for batch in range(1, 6):
        print(f"Reading batch #{batch}")
        for shm in shm_arr:
            #### Simulated Reading ####
            for j in range(0, shm.size):
                shm.buf[j] = random.randint(0, 255)
            #### ####
            q_out.put((batch, shm))
        # Need to sync here because we're reusing the same SharedMemory,
        # so gotta wait until all processors are done before sending the
        # next batch
        while not q_out.empty() or not all(procr_ready.values()):
            time.sleep(1.0)


def processorfunc(
        q_in: Queue, q_out: Queue, suicide: type(MP.Event()), procr_ready: Dict[str, bool]
):
    pname = MP.current_process().name
    procr_ready[pname] = False
    while True:
        time.sleep(1.0)
        procr_ready[pname] = True
        if q_in.empty() and suicide.is_set():
            break
        try:
            batch, shm = q_in.get_nowait()
        except Empty:
            continue
        print(pname, "got batch", batch)
        procr_ready[pname] = False
        #### Simulated Processing ####
        h = hashlib.blake2b(shm.buf, digest_size=4, person=b"processor")
        time.sleep(random.uniform(5.0, 7.0))
        #### ####
        q_out.put((pname, h.hexdigest()))


def writerfunc(q_in: Queue, suicide: type(MP.Event())):
    while True:
        time.sleep(1.0)
        if q_in.empty() and suicide.is_set():
            break
        try:
            pname, digest = q_in.get_nowait()
        except Empty:
            continue
        print("Writing", pname, digest)
        #### Simulated Writing ####
        time.sleep(random.uniform(3.0, 6.0))
        #### ####
        print("Writing", pname, digest, "done")


def main():
    shm_arr = [
        SharedMemory(create=True, size=1024)
        for _ in range(0, 5)
    ]
    q_read = MP.Queue()
    q_write = MP.Queue()
    procr_ready = MP.Manager().dict()
    poison = MP.Event()
    poison.clear()

    reader = MP.Process(target=readerfunc, args=(shm_arr, q_read, procr_ready))

    procrs = []
    for n in range(0, 3):
        p = MP.Process(
            target=processorfunc, name=f"Proc{n}", args=(q_read, q_write, poison, procr_ready)
        )
        procrs.append(p)

    writer = MP.Process(target=writerfunc, args=(q_write, poison))

    reader.start()
    [p.start() for p in procrs]
    writer.start()

    reader.join()
    print("Reader has ended")

    while not all(procr_ready.values()):
        time.sleep(5.0)
    poison.set()
    [p.join() for p in procrs]
    print("Processors have ended")

    writer.join()
    print("Writer has ended")

    [shm.close() for shm in shm_arr]
    [shm.unlink() for shm in shm_arr]


if __name__ == '__main__':
    main()

推荐阅读