首页 > 解决方案 > 图中的并行计算

问题描述

我想对图进行一些分析,以找到图中所有节点对之间所有可能的简单路径。在Networkx库的帮助下,我可以使用 DFS 通过此功能查找 2 个节点之间的所有可能路径:

nx.all_simple_paths(G,source,target)

下面的代码在没有任何工作负载的情况下运行,因为我的玩具示例在图中仅包含 6 个节点。但是,在我的实际任务中,我的图包含 5,213 个节点和 11,377,786 条边,并且使用以下解决方案无法在该图中找到所有可能的简单路径:

import networkx as nx
graph = nx.DiGraph()
graph.add_weighted_edges_from(final_edges_list) 

list_of_nodes = list(graph.nodes())

paths = {}

for n1 in list_of_nodes:
    for n2 in list_of_nodes:
        if n1 != n2:
            all_simple_paths = list(nx.all_simple_paths(graph,n1,n2))
            paths[n1+ "-"+n2] = all_simple_paths

“路径”字典将“n1-n2”(分别为源节点和目标节点)作为键,并将所有简单路径的列表作为值。

问题是我是否可以在这种情况下使用多处理以便在我原来的问题上运行此代码。我对处理器、线程、共享内存和 CPU 内核的了解非常幼稚,我不确定我是否真的可以在我的任务中使用并发(并行运行我的嵌套循环)。我使用具有 128 GB RAM 和 32 核 CPU 的 Windows 服务器。

PS:彻底搜索网络(主要是 StackOverFlow),我找到了推荐使用线程的解决方案,其他人推荐使用多处理。我不确定我是否理解这两者之间的区别:|

标签: pythongraphmultiprocessing

解决方案


如果要使用线程,请使用线程池执行程序将函数调用提交给线程。它将返回一个未来对象。Future.result() 将返回调用返回的值。如果调用尚未完成,则此方法将等待超时秒。如果在此之前调用未完成,它将引发 TimeoutError。

with ThreadPoolExecutor() as executor:
    for n1 in list_of_nodes:
        for n2 in list_of_nodes:
            if n1 != n2:
                all_simple_paths_futures = executor.submit(nx.all_simple_paths, graph,n1,n2)
            paths[n1+ "-"+n2] = all_simple_paths_futures
try:            
    for key in paths.keys():
        # get back  results from thread
        future_obj = paths[key]
        paths[key]= list(future_obj.result())
except Exception as e:
    print(e)
    raise e

有关多处理和线程之间的区别,请查看此链接:Multiprocessing vs Threading Python


推荐阅读