python - 如何使用多处理并行使用 python gneerator?
问题描述
如何提高 networkx 函数的性能local_bridges
https://networkx.org/documentation/stable//reference/algorithms/generated/networkx.algorithms.bridges.local_bridges.html#networkx.algorithms.bridges.local_bridges
我已经尝试过使用 pypy - 但到目前为止,我仍然坚持在单核上使用生成器。我的图有 300k 边。一个例子:
# construct the nx Graph:
import networkx as nx
# construct an undirected graph here - this is just a dummy graph
G = nx.cycle_graph(300000)
# fast - as it only returns an generator/iterator
lb = nx.local_bridges(G)
# individual item is also fast
%%time
next(lb)
CPU times: user 1.01 s, sys: 11 ms, total: 1.02 s
Wall time: 1.02 s
# computing all the values is very slow.
lb_list = list(lb)
如何并行使用此迭代器以利用所有处理器内核?当前的幼稚实现仅使用单核!
我天真的多线程第一次尝试是:
import multiprocessing as mp
lb = nx.local_bridges(G)
pool = mp.Pool()
lb_list = list(pool.map((), lb))
但是,我不想应用特定的函数——()
而只是next
从迭代器中并行获取元素。
编辑
我想它归结为如何并行化:
lb_res = []
lb = nx.local_bridges(G)
for node in range(1, len(G) +1):
lb_res.append(next(lb))
lb_res
天真地使用多处理显然失败了:
# from multiprocessing import Pool
# https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror
from multiprocess import Pool
lb_res = []
lb = nx.local_bridges(G)
def my_function(thing):
return next(thing)
with Pool(5) as p:
parallel_result = p.map(my_function, range(1, len(G) +1))
parallel_result
但我不清楚如何将生成器作为参数传递给 map 函数 - 并完全使用生成器。
编辑 2
对于这个特定问题,事实证明瓶颈是with_span=True
参数的最短路径计算。禁用时,它的速度相当快。
当需要计算跨度时,我建议cugraph
在 GPU 上快速实现 SSSP。尽管如此,对边集的迭代不会并行发生,应该进一步改进。
但是,要了解更多信息,我有兴趣了解如何在 python 中并行化生成器的消耗。
解决方案
您不能并行使用生成器,每个非平凡生成器的下一个状态都由其当前状态决定。您必须next()
按顺序调用。
从https://github.com/networkx/networkx/blob/master/networkx/algorithms/bridges.py#L162这就是函数的实现方式
for u, v in G.edges:
if not (set(G[u]) & set(G[v])):
yield u, v
所以你可以使用类似这样的东西来并行化它,但是你将不得不承担使用类似multiprocessing.Manager
. 我认为这只会让整个事情变得更慢,但你可以自己计时。
def process_edge(e):
u, v = e
lb_list = []
if not (set(G[u]) & set(G[v])):
lb_list.append((u,v))
with Pool(os.cpu_count()) as pool:
pool.map(process_edge, G.edges)
另一种方法是将图形拆分为顶点范围并同时处理它们。
def process_nodes(nodes):
lb_list = []
for u in nodes:
for v in G[u]:
if not (set(G[u]) & set(G[v])):
lb_list.append((u,v))
with Pool(os.cpu_count()) as pool:
pool.map(process_nodes, np.array_split(list(range(G.number_of_nodes())),
os.cpu_count()))
也许您还可以检查是否存在针对此问题的更好算法。或者找到一个用 C 实现的更快的库。
推荐阅读
- java - 过滤器列表包含多个对象java
- c# - Type name "OleDbDataAdapter" could not be found in the namespace "System.Data.OleDb"
- ios - 与 UIKit PKCanvasView 下方的 SwiftUI 视图交互
- c - 你能帮我吗,在第 6 和第 7 个打印语句中递增是如何工作的
- discord.js - 设置后如何清除我的 Discord 机器人的状态?
- java - 如何在对 Spring Boot 的 GET 请求中格式化对象
- pine-script - Pine Editor 使用 plot() 为最后的 X 条创建一条恒定线
- consul - 使用 Ocelot 的 Consul 集群未使用所有服务
- c - 在 c 中找到 N 的 N 个不同因子
- python - oauth2client.client.HttpAccessTokenRefreshError: invalid_grant: 令牌已过期或撤销