首页 > 解决方案 > 在使用多处理包运行多个进程时在 python 中拥有一个共享字典

问题描述

我试图弄清楚我如何使用该multiprocessing库来完成一项任务,并且我一直在坐立不安以了解事情是如何工作的。我要解决的任务是计算一个非常大的文件(文本语料库)中特定单词的出现次数,我编写了这个基本脚本来解决更小规模的问题,然后再尝试更大的文件。

我从一个嵌套的名称列表开始(4 个块,用于表示一个分块文件),我使用一个队列来跟踪每个块中名称计数的字典,然后在最后合并它。这就是我到目前为止所拥有的。

import multiprocessing
from multiprocessing import Process, Queue, Pool, cpu_count
from timeit import default_timer as timer
import random

names = ["Julie", "Ben", "Rob", "Samantha", "Alice", "Jamie"]
file_chunks = [
    [random.choice(names) for _ in range(1000)] for _ in range(4)
] # nested list of 4 chunks each containing 1000 randomly selected names from the <names> list


def count_occurances_in_part(queue, chunk):
    name_count = {name:0 for name in names}
    for name in chunk:
        name_count[name] += 1

    queue.put(name_count)
    

def main():
    queue = Queue()
    n_cpus = cpu_count()
    print(f"Dividing tasks between {n_cpus} CPUs")
    
    processes = [Process(target=count_occurances_in_part, args=(queue, chunk)) for chunk in file_chunks]
    
    start = timer()
    
    for p in processes:
        p.start()
    
    for p in processes:
        p.join()
    
    end = timer()
    print(f'elapsed time: {end - start}')
    
    results = [queue.get() for _ in processes]
    
    final_dict = {name:0 for name in names}
    for result in results:
        for name, count in result.items():
            final_dict[name] += count
            
    print(final_dict)

在这种情况下,这是一个示例输出

Dividing tasks between 8 CPUs
elapsed time: 0.07983553898520768
{'Julie': 662, 'Ben': 698, 'Rob': 690, 'Samantha': 669, 'Alice': 653, 'Jamie': 628}

但是,我很好奇是否有办法让我拥有一个由所有进程共享的字典,这样我就可以在那里更新计数,而不必在最后合并内容。我在读到使用队列是首选方式,但我不知道队列是否可行。

另一个问题是是否有一些简单的方法可以在最后合并字典,如果我的最终字典非常大并且循环需要很长时间,则无需循环运行匹配键的计数时间?

另外,我没有太多使用多处理包,所以如果您有任何其他建议,将不胜感激!

标签: pythonpython-3.xmultiprocessingpython-multiprocessing

解决方案


推荐阅读