首页 > 解决方案 > 使用 python 并行化在图上进行大处理

问题描述

我正在研究复杂网络的图形和大数据集。我使用 ndlib 库对它们运行 SIR 算法。但每次迭代都需要 1Sec 之类的时间,它使代码需要 10-12 小时才能完成。我想知道有没有办法让它并行化?代码就像下面

这行代码是核心:

sir = model.infected_SIR_MODEL(it, infectionList, False)

有什么简单的方法可以让它在多线程或并行上运行?

count = 500
for i in numpy.arange(1, count, 1):

    for it in model.get_nodes():

        sir = model.infected_SIR_MODEL(it, infectionList, False)

每次迭代:

 for u in self.graph.nodes():

            u_status = self.status[u]
            eventp = np.random.random_sample()
            neighbors = self.graph.neighbors(u)
            if isinstance(self.graph, nx.DiGraph):
                neighbors = self.graph.predecessors(u)

            if u_status == 0:
                infected_neighbors = len([v for v in neighbors if self.status[v] == 1])
                if eventp < self.BetaList[u] * infected_neighbors:
                    actual_status[u] = 1
            elif u_status == 1:
                if eventp < self.params['model']['gamma']:
                    actual_status[u] = 2

标签: pythonpython-3.xgraphparallel-processing

解决方案


所以,如果迭代是独立的,那么我看不到迭代的意义count=500。无论哪种方式,您都可能对多处理库感兴趣。

我准备了 2 个存根解决方案(即根据您的确切需求进行更改)。第一个期望每个输入都是静态的(据我所知,OP 的问题是从每次迭代中的随机状态生成引起的解决方案的变化)。使用第二个,您可以在 的迭代之间更新输入数据i。我没有尝试过代码,因为我没有,model所以它可能无法直接工作。

import multiprocessing as mp


# if everything is independent (eg. "infectionList" is static and does not change during the iterations)

def worker(model, infectionList):
    sirs = []
    for it in model.get_nodes():
        sir = model.infected_SIR_MODEL(it, infectionList, False)
        sirs.append(sir)
    return sirs

count = 500
infectionList = []
model = "YOUR MODEL INSTANCE"

data = [(model, infectionList) for _ in range(1, count+1)]
with mp.Pool() as pool:
    results = pool.starmap(worker, data)

如果“infectionList”或其他东西在“i”的每次迭代中得到更新,则第二个建议的解决方案:

def worker2(model, it, infectionList):
    sir = model.infected_SIR_MODEL(it, infectionList, False)
    return sir

with mp.Pool() as pool:
    for i in range(1, count+1):
        data = [(model, it, infectionList) for it in model.get_nodes()]
        results = pool.starmap(worker2, data)

        # process results, update something go to next iteration....

编辑:更清楚地更新了单独提案的答案。


推荐阅读