首页 > 解决方案 > python过滤器+多处理+迭代器延迟加载

问题描述

我有一个二维数组,它产生一个巨大的(> 300GB)组合列表,所以我想对 itertools.combinations 生成的迭代器进行惰性迭代并并行化这个操作。问题是我需要过滤输出,而 Multiprocessing 不支持。我现有的解决方法需要将组合列表加载到内存中,由于列表的大小,这也不起作用。


n_nodes = np.random.randn(10, 100)
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def pfilter(func, candidates):
    return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


edgelist = pfilter(pearsonr, node_combinations(n_nodes))

我正在寻找一种使用带过滤器而不是映射的多处理对大型迭代器进行惰性评估的方法。

标签: pythonfilterpython-multiprocessinglazy-evaluation

解决方案


Hoxha 的建议效果很好——谢谢!

@Dan 的问题是,即使是空列表也会占用内存,420 亿个配对在内存中接近 3TB。

这是我的实现:

import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm

n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3

def node_combinations(nodes):
    return itertools.combinations(list(range(len(nodes))), 2)    

def edge_gen(xy_iterator: type(itertools.islice)):
    edges = []
    for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
        if pearsonr(cand):
            edges.append(cand)

def pearsonr(xy: tuple):
    correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
    if correlation_coefficient >= cutoff:
            return True
        else:
            return False


slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()


推荐阅读