python - python过滤器+多处理+迭代器延迟加载
问题描述
我有一个二维数组,它产生一个巨大的(> 300GB)组合列表,所以我想对 itertools.combinations 生成的迭代器进行惰性迭代并并行化这个操作。问题是我需要过滤输出,而 Multiprocessing 不支持。我现有的解决方法需要将组合列表加载到内存中,由于列表的大小,这也不起作用。
n_nodes = np.random.randn(10, 100)
cutoff=0.3
def node_combinations(nodes):
return itertools.combinations(list(range(len(nodes))), 2)
def pfilter(func, candidates):
return np.asarray([c for c, keep in zip(candidates, pool.map(func, candidates)) if keep])
def pearsonr(xy: tuple):
correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
if correlation_coefficient >= cutoff:
return True
else:
return False
edgelist = pfilter(pearsonr, node_combinations(n_nodes))
我正在寻找一种使用带过滤器而不是映射的多处理对大型迭代器进行惰性评估的方法。
解决方案
Hoxha 的建议效果很好——谢谢!
@Dan 的问题是,即使是空列表也会占用内存,420 亿个配对在内存中接近 3TB。
这是我的实现:
import more_itertools
import itertools
import multiprocessing as mp
import numpy as np
import scipy
from tqdm import tqdm
n_nodes = np.random.randn(10, 100)
num_combinations = int((int(n_nodes.shape[0]) ** 2) - int(n_nodes.shape[0]) // 2)
cpu_count = 8
cutoff=0.3
def node_combinations(nodes):
return itertools.combinations(list(range(len(nodes))), 2)
def edge_gen(xy_iterator: type(itertools.islice)):
edges = []
for cand in tqdm(xy_iterator, total=num_combinations//cpu_count)
if pearsonr(cand):
edges.append(cand)
def pearsonr(xy: tuple):
correlation_coefficient = scipy.stats.pearsonr(n_nodes[xy[0]], n_nodes[xy[1]])[0]
if correlation_coefficient >= cutoff:
return True
else:
return False
slices = more_itertools.distribute(cpu_count), node_combinations(n_nodes))
pool = mp.Pool(cpu_count)
results = pool.imap(edge_gen, slices)
pool.close()
pool.join()
推荐阅读
- java - 如何让nestedscrollview 可以滚动或不滚动?
- gesture - 如何平移然后立即用锤子js捏?
- angularjs - ngTableEventsChannel.onAfterDataFiltered 不起作用?
- sql - 如何添加列,然后使用过程更新 sql server 中的列
- css - React CSS Webpack Hashing & Usage with semantic-ui-sass, semantic-ui-react
- c# - 通过更改查询号更新整个列表
- continuous-integration - 有什么方法可以将 dotenv 与 Bitbucket Pipelines 一起使用?
- module - Julia:将模块拆分为多个文件
- php - html - 如何将选定的选项保存到数据库中
- java - 在 Java 中关闭 ExecutorService 的最佳方法