首页 > 解决方案 > 在 Python 中将无向循环图 (UCG) 转换为有向无环图 (DAG) 的最快方法?

问题描述

假设我有一个无向循环图(UCG)。所有边的权重都是1。因此,这个UCG可以用一个邻接矩阵来表示A

import numpy as np

A = np.array([[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
              [1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
              [1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
              [1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
              [1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
              [0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
              [1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
              [1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
              [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
              [0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
              [1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]])

为了可视化 UCG,我们可以简单地将其转换为networkx.Graph对象

import networkx as nx

ucg = nx.Graph()
rows, cols = np.where(A == 1)
edges = zip(rows.tolist(), cols.tolist())
ucg.add_edges_from(edges)

UCG 看起来像这样: 在此处输入图像描述

我用不同的颜色为节点着色以显示“最小距离”。橙色节点{8, 9, 10}是起始节点,绿色节点{0, 1, 2, 3}是与起始节点的最小距离为 1 的节点,蓝色节点{4, 5, 6, 7}的最小距离为 2。现在我想将其转换为有向无环图(DAG ) 箭头从起始节点指向距离为 1 的节点到距离为 2 的节点,依此类推。具有相同“最小距离”的节点之间的边被丢弃。

预期的输出是一个表示 DAG 的字典:

d = {8: {1, 3},
     9: {1, 2},
     10: {0, 2, 3},
     0: {4, 6, 7},
     1: {5, 6, 7},
     2: {4, 5, 6},
     3: {4, 5, 7}}

同样,为了可视化 DAG,我们可以将其转换为networkx.DiGraph对象

dag = nx.DiGraph()
dag.add_edges_from([(k, v) for k, vs in d.items() in for v in vs])

预期的输出 DAG 如下所示: 在此处输入图像描述

我想编写一个高效且通用的代码,将具有给定起始节点的给定 UCG 转换为相应的 DAG。

我试过的

显然,需要递归。我的想法是使用 BFS 方法找到每个起始节点的 1 距离节点,然后找到它们的 1 距离节点,然后递归继续下去。所有访问过的节点都存储在一个集合prev_starts中以避免倒退。下面是我的代码

from collections import defaultdict

def ucg2dag(A, starts):
    """Takes the adjacency matrix of a UCG and the indices of the
    starting nodes, returns the dictionary of a DAG."""

    def recur(starts):
        starts = list(set(starts))
        idxs, nbrs = np.where(A[starts] == 1)
        prev_starts.update(starts)

        # Filter out the neighbors that are previous starts so the
        # arrows do not point backwards
        try:
            idxs, nbrs = zip(*((idx, nbr) for idx, nbr in zip(idxs, nbrs)
                                if nbr not in prev_starts))
        # Terminate if every neighbor is a previous start.
        except:
            return d

        for idx, nbr in zip(idxs, nbrs):
            d[starts[idx]].add(nbr)

        return recur(starts=nbrs)

    prev_starts = set()
    d = defaultdict(set)
    return recur(starts)

测试我的代码:

d = ucg2dag(A, starts={8, 9, 10})
print(d)

编辑:在感谢@trincot的评论return之前添加之后,我能够得到正确的输出:recur

defaultdict(<class 'set'>, 
            {8: {1, 3}, 
             9: {1, 2}, 
             10: {0, 2, 3}, 
             0: {4, 6, 7}, 
             1: {5, 6, 7}, 
             2: {4, 5, 6}, 
             3: {4, 5, 7}})
%timeit 37.6 µs ± 591 ns per loop (mean ± std. dev. of 7 runs, 10000 loops each)

实际上,我有一个更大的图表。我想知道是否有更有效的算法?

标签: pythonalgorithmrecursiongraphnetworkx

解决方案


您已经对您的代码应用了一些修复(部分基于注释),因此您现在有了工作代码。

剩下的仅有的几句话是:

  • BFS 通常不是递归算法(与 DFS 相比):您拥有的递归是尾递归的情况。在这种情况下,它可以写成一个循环,你会避免使用堆栈。

  • 很遗憾,您必须在邻接矩阵中查找边缘。最好先将邻接矩阵转换为邻接列表,除非图真的很密集。

  • 输出可以是邻接列表,每个节点都有一个条目,这样它就可以是列表列表而不是字典

  • 结构的重复转换使用zip可能不是最有效的(虽然我没有基准测试)

如果不使用 numpy,它可能看起来像这样:

def ucg2dag(adj_matrix, starts):
    adj_list = [
        [target for target, is_connected in enumerate(row) if is_connected]
            for row in adj_matrix
    ]

    frontier = starts

    dag = [[] for _ in range(len(adj_list))]

    while frontier:
        for source in frontier:
            dag[source].extend(target for target in adj_list[source] if not target in starts)
        frontier = set(target 
            for source in frontier for target in adj_list[source] if not target in starts
        )
        starts.update(frontier)

    return dag

示例运行:

adj_matrix = [[0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 1],
              [1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0],
              [1, 1, 0, 1, 1, 1, 1, 0, 0, 1, 1],
              [1, 1, 1, 0, 1, 1, 0, 1, 1, 0, 1],
              [1, 0, 1, 1, 0, 1, 1, 1, 0, 0, 0],
              [0, 1, 1, 1, 1, 0, 1, 1, 0, 0, 0],
              [1, 1, 1, 0, 1, 1, 0, 1, 0, 0, 0],
              [1, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0],
              [0, 1, 0, 1, 0, 0, 0, 0, 0, 0, 0],
              [0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0],
              [1, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0]]

dag = ucg2dag(adj_matrix, {8, 9, 10})
print(dag)  

示例运行的输出:

[[4, 6, 7], [5, 6, 7], [4, 5, 6], [4, 5, 7], [], [], [], [], [1, 3], [1, 2], [0, 2, 3]]

推荐阅读