首页 > 解决方案 > How to use multiprocessing in Python on for loop to generate nested dictionary?

问题描述

I am currently generating a nested dictionary that saves some arrays by using a nested for loop. Unfortunately, it takes quite some time; I realized that the server I am working on has a few cores available, so I was wondering if Python's multiprocessing library could be helpful to speed up the creation of the dictionary.

The nested for loop looks something like this (the actual computation is heavier and more complex):

import numpy as np

data_dict = {}
for s in range(1,5):
    data_dict[s] = {}
    for d in range(1,5):
        if s * d > 4:
            data_dict[s][d] = np.zeros((s,d))
        else:
            data_dict[s][d] = np.ones((s,d))

So this is what I tried:

from multiprocessing import Pool
import numpy as np

data_dict = {}
def process():
    #sci=fits.open('{}.fits'.format(name))
    for s in range(1,5):
        data_dict[s] = {}
        for d in range(1,5):
            if s * d > 4:
                data_dict[s][d] = np.zeros((s,d))
            else:
                data_dict[s][d] = np.ones((s,d))

if __name__ == '__main__':
    pool = Pool()                         # Create a multiprocessing Pool
    pool.map(process)  

But pool.map (last line) seems to require an iterable, which I'm not sure what to insert there.

标签: pythonarraysnumpyfor-loopmultiprocessing

解决方案


In my opinion, the real problem is what kind of processing is needed to compute entries of the dictionary and how many entries are there.

The kind of processing is essential to understand if multiprocessing can significantly speed up the creation of the dictionary. If your computation is I/O bound, you should use multithreading, while if it's CPU bound you should use multiprocessing. You can find more bout this here.

Assuming that the value of each entry can be computed independently and that this computation is CPU bound, let's benchmark the difference between single process and multiprocess implementation (based on multiprocessing library).

The following code is used to test the two approaches in some scenarios, varying the complexity of the computation needed for each entry and the number of entries (for the multiprocess implementation, 7 processes were used).

import timeit
import numpy as np

def some_fun(s, d, n=1):
    """A function with an adaptable complexity"""
    a = s * np.ones(np.random.randint(1, 10, (2,))) / (d + 1)
    for _ in range(n):
        a += np.random.random(a.shape)
    return a

# Code to create dictionary with only one process
setup_simple = "from __main__ import some_fun, n_first_level, n_second_level, complexity"

code_simple = """
data_dict = {}
for s in range(n_first_level):
    data_dict[s] = {}
    for d in range(n_second_level):
        data_dict[s][d] = some_fun(s, d, n=complexity)
"""

# Code to create a dictionary with multiprocessing: we are going to use all the available cores except 1
setup_mp = """import numpy as np
import multiprocessing as mp
import itertools
from functools import partial
from __main__ import some_fun, n_first_level, n_second_level, complexity

n_processes = mp.cpu_count() - 1
# Uncomment if you want to know how many concurrent processes are you going to use
# print(f'{n_processes} concurrent processes')
"""

code_mp = """
with mp.Pool(processes=n_processes) as pool:
    dict_values = pool.starmap(partial(some_fun, n=complexity), itertools.product(range(n_first_level), range(n_second_level)))
data_dict = {
    k: dict(zip(range(n_second_level), dict_values[k * n_second_level: (k + 1) * n_second_level]))
    for k in range(n_first_level)
}
"""

# Time the code with different settings
print('Execution time on 10 repetitions: mean [std]')
for label, complexity, n_first_level, n_second_level in (
    ("TRIVIAL FUNCTION", 0, 10, 10),
    ("TRIVIAL FUNCTION", 0, 500, 500),
    ("SIMPLE FUNCTION", 5, 500, 500),
    ("COMPLEX FUNCTION", 50, 100, 100),
    ("HEAVY FUNCTION", 1000, 10, 10),
):
    print(f'\n{label}, {n_first_level * n_second_level} dictionary entries')
    for l, t in (
        ('Single process', timeit.repeat(stmt=code_simple, setup=setup_simple, number=1, repeat=10)),
        ('Multiprocess', timeit.repeat(stmt=code_mp, setup=setup_mp, number=1, repeat=10)),
    ):
        print(f'\t{l}: {np.mean(t):.3e} [{np.std(t):.3e}] seconds')

These are the results:

Execution time on 10 repetitions: mean [std]

TRIVIAL FUNCTION, 100 dictionary entries
    Single process: 7.752e-04 [7.494e-05] seconds
    Multiprocess: 1.163e-01 [2.024e-03] seconds

TRIVIAL FUNCTION, 250000 dictionary entries
    Single process: 7.077e+00 [7.098e-01] seconds
    Multiprocess: 1.383e+00 [7.752e-02] seconds

SIMPLE FUNCTION, 250000 dictionary entries
    Single process: 1.405e+01 [1.422e+00] seconds
    Multiprocess: 2.858e+00 [5.742e-01] seconds

COMPLEX FUNCTION, 10000 dictionary entries
    Single process: 1.557e+00 [4.330e-02] seconds
    Multiprocess: 5.383e-01 [5.330e-02] seconds

HEAVY FUNCTION, 100 dictionary entries
    Single process: 3.181e-01 [5.026e-03] seconds
    Multiprocess: 1.171e-01 [2.494e-03] seconds

As you can see, assuming that you have a CPU bounded computation, the multiprocess approach achieves better results in most of the scenarios. Only if you have a very light computation for each entry and/or a very limited number of entries, the single process approach should be preferred.

On the other hand, the improvement provided by multiprocessing comes with a cost: for example, if your computation for each entry uses a significant amount of memory, you could incur an OutOfMemory error, meaning that you have to improve your code and make it more complex to avoid it, finding the right balance between memory occupation and decrease in execution time. If you look around, there are a lot of questions asking how to solve memory issues caused by a non-optimal use of multiprocessing. In other words, this means that your code will be less easy to read and maintain.

To sum up, you should judge if the improvement in execution time is worthed, even if it is possible.


推荐阅读