首页 > 解决方案 > 为大型itertools产品实现内存高效的ThreadPool

问题描述

我有一个包含 15 个变量的字典,每个变量有 3 个值,我需要为此生成所有可能组合的乘积(3**15 = 14.3M 组合)。我正在使用带有 12 核处理器的多线程来处理组合(可能会跳转到 64 核)。

itertools.product用来生成不同的组合,并ThreadPool用于imap_unordered运行多处理。此外,我deque会在结果可用时立即删除它。但是,我发现内存消耗高达 2.5GB 左右。我知道这itertools.product是一个可迭代的,因此不应该在内存中存储太多数据,但似乎并非如此。

下面是我的代码,我想知道是否有人可以帮助我弄清楚如何更好地优化内存利用率。

另外,我想知道块大小如何影响imap_unordered内存效率。我尝试了不同的数字来查看它如何影响内存使用(包括 10、100、1000、10000),但除了将内存利用率稳定在 2.5GB 左右之外,它似乎没有太大影响。如果我不包括块大小,内存往往会爆炸> 5GB。

我还尝试将线程数从 12 更改为 1,这也没有影响内存使用。但是,使用单处理器实现(在下面注释掉)将内存使用量减少到仅约 30MB。

import numpy as np
import itertools
import multiprocessing
import queue
import functools
from multiprocessing import pool, dummy

def dummy_func(values, keys):
    print( dict(zip(keys, values)) )
    return

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'], 
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'], 
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'], 
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'], 
                  'i': ['7p', '16p', '22p'], 
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'], 
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'], 
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']}
    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    # process simulations for all permutations using multi-threading
    with multiprocessing.pool.ThreadPool(num_threads) as workers:
        queue.deque(workers.imap_unordered(functools.partial(dummy_func, keys=keys), 
                                           itertools.product(*map(parameters.get, keys)), 100))
    return

if __name__ == "__main__":
    main()

标签: pythonmultiprocessingthreadpoolitertools

解决方案


更新

如果您不想炸毁内存,则需要三件事:

  1. 您需要有一个迭代器来生成传递给您的值,以dummy_func增量方式生成值。itertools.product实际上在产生第一个值之前会在内存中生成所有值,所以无论你做什么它都会炸毁内存。
  2. 您必须使用一个函数来逐个处理可迭代deque对象,并为每个结果将结果附加到使用合适的非零maxlen参数初始化的结果中。您当前的代码正在使用函数deque的完整输出初始化 ,该输出map将具有传递的iterable的长度。这会破坏记忆。
  3. 即使您为工作函数生成值dummy_func,增量使用imap,您生成任务的速度也可能比生成结果的速度快,因此池的输入队列将继续增长,并且您将爆炸内存。

为了克服1中描述的问题。我正在使用permutations生成器功能。

为了克服 2 中描述的问题。我使用maxlen=10初始化了一个空双端队列。由于每个值都是从我返回的,dumy_func所以我会将它附加到双端队列。

要克服 3. 中描述的问题,您需要使用BoundedQueueProcessPoolorBoundedQueueThreadPool类。它使用该imap方法通过回调函数提交新任务来处理结果。它与标准池函数的不同之处在于,一旦输入队列大小达到池中的进程数或线程数视情况而定(您可以手动指定最大队列大小),它默认会阻止主线程提交更多任务使用max_waiting_tasks参数):

import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps, partial

name = 'bounded_pool'

class ImapResult():
    def __init__(self, semaphore, result):
        self.semaphore = semaphore
        self.it = result.__iter__()

    def __iter__(self):
        return self

    def __next__(self):
        try:
            elem = self.it.__next__()
            self.semaphore.release()
            return elem
        except StopIteration:
            raise
        except:
            self.semaphore.release()
            raise

class BoundedQueuePool:
    def __init__(self, semaphore):
        self.semaphore = semaphore

    def release(self, result, callback=None):
        self.semaphore.release()
        if callback:
            callback(result)

    def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None):
        self.semaphore.acquire()
        callback_fn = self.release if callback is None else partial(self.release, callback=callback)
        error_callback_fn = self.release if error_callback is None else partial(self.release, callback=error_callback)
        return super().apply_async(func, args, kwds, callback=callback_fn, error_callback=error_callback_fn)

    def imap(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

    def imap_unordered(self, func, iterable, chunksize=1):
        def new_iterable(iterable):
            for elem in iterable:
                self.semaphore.acquire()
                yield elem
        result = super().imap_unordered(func, new_iterable(iterable), chunksize)
        return ImapResult(self.semaphore, result)

class BoundedQueueProcessPool(BoundedQueuePool, multiprocessing.pool.Pool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePool.__init__(self, multiprocessing.BoundedSemaphore(self._processes + max_waiting_tasks))

class BoundedQueueThreadPool(BoundedQueuePool, multiprocessing.pool.ThreadPool):
    def __init__(self, *args, max_waiting_tasks=None, **kwargs):
        multiprocessing.pool.ThreadPool.__init__(self, *args, **kwargs)
        if max_waiting_tasks is None:
            max_waiting_tasks = self._processes
        elif max_waiting_tasks < 0:
            raise ValueError(f'Invalid negative max_waiting_tasks value: {max_waiting_tasks}')
        BoundedQueuePool.__init__(self, threading.BoundedSemaphore(self._processes + max_waiting_tasks))

def threadpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

def processpool(pool):
    def decorate(f):
        @wraps(f)
        def wrap(*args, **kwargs):
            return pool.apply_async(f, args, kwargs)
        return wrap
    return decorate

##################################################################

import queue
from itertools import permutations

def dummy_func(values, keys):
    #print( dict(zip(keys, values)))
    ...
    return dict(zip(keys, values))

def main():
    num_threads = multiprocessing.cpu_count()

    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  'k': ['4', '8', '11'], 'l': ['41', '77', '113'], 'm': ['4', '8', '11'],
                  'n': ['16p', '31p', '46p'], 'o': ['20n', '30n', '35n']
                  }

    # A more reasonably sized parameters:
    parameters = {'a': ['7.0', '9.0', '11.0'], 'b': ['125p', '200p', '275p'],
                  'c': ['320n', '440n', '560n'], 'd': ['400p', '500p', '600p'],
                  'e': ['262p', '374p', '486p'], 'f': ['13p', '25p', '37p'],
                  'g': ['19p', '40p', '61p'], 'h': ['7p', '16p', '22p'],
                  'i': ['7p', '16p', '22p'],
                  'j': ['0.7200000000000004', '1.1500000000000008', '1.5700000000000012'],
                  }


    keys = list(parameters)

    # process simulations for all permutations using single process
    #for values in itertools.product(*map(parameters.get, keys)):
    #    dummy_func(values, keys)

    q = queue.deque(maxlen=10)

    pool = BoundedQueueThreadPool(num_threads)
    for v in pool.imap(partial(dummy_func, keys=keys), permutations(parameters.values(), len(keys))):
        q.append(v)
    return q

if __name__ == '__main__':
    import time
    t = time.time()
    q = main()
    print(q)
    print(time.time() - t)

推荐阅读