首页 > 解决方案 > SQL/Python - 递归加入 id 列

问题描述

我有一个包含 2 列的 SQL 表 - a_id 和 b_id(a_id 与 b_id 具有多对多关系)。我正在尝试将所有 a_ids 与所有链接的 a_ids 和 b_ids 连接起来。例如

援助 出价
一个 1
2
C 2
一个 3
D 3

应该导致:

[A,D] : [1,3]

[B,C]:[2]

最终格式是什么并不重要,因为我可以将其旋转/分解以再次变平。

我目前正在使用 while 循环在 b_id 上迭代地将数据连接到自身,然后按 a_id 分组并将所有相应的 a_ids 和 b_ids 连接到一个数组中,然后展平数组并再次连接等。因为我有数百万行数据代码需要几个小时才能运行,我想知道是否有更快的方法可以使用 SQL 和 python 来执行此操作。这是我目前使用的代码:

def join_to_self():

        connection.execute('''
            CREATE OR REPLACE TABLE joined_data as
                (
            SELECT DISTINCT
                       j.b_id as b_id,
                       j.a_id as a_id1,
                       jo.a_id as a_id2
            from temp_data as j
                full outer join temp_data as jo
                on j.b_id = jo.b_id );''')


def find_matches():

    connection.execute('''CREATE OR REPLACE TABLE temp_data as
            (
            with agg_matches as(
            SELECT
                    g.a_id1,
                    array_agg(distinct g.b_id) as b_ids,
                    array_agg(distinct g.a_id2) as a_id2
            FROM joined_data g
            GROUP BY g.a_id1
                  )
            SELECT DISTINCT
                   cast(A.value as varchar) as a_id,
                   cast(B.value as int) AS b_id
            FROM   agg_matches,
                   Table(Flatten(agg_matches.b_ids)) B,
                   Table(Flatten(agg_matches.a_id2 )) A);''')

    number_rows = connection.execute(''SELECT COUNT(*) FROM  joined_data'').fetchone()

   
return number_rows[0]

    
join_to_self()

stop = True

while stop:

    previous_rows = find_matches()

    join_to_self()
    current_rows = find_matches()

    if previous_rows == current_rows:
        stop = False

标签: pythonsqlgraph-theorysnowflake-cloud-data-platformrecursive-query

解决方案


识别点簇是一个图行走问题,在 SQL 中需要递归 CTE。您将首先生成所有边,然后在跟踪已访问边的同时遍历图形,最后通过最小的公共边识别每个组:

with 
    edges as (
        select t1.a_id as a_id1, t2.a_id as a_id2
        from temp_data t1
        inner join temp_data t2 on t1.b_id = t2.b_id
    ),
    cte as (
        select a_id1, a_id2, array_construct(a_id1) as visited 
        from edges 
        where a_id1 = a_id2
        union all
        select c.a_id1, e.a_id2, array_append(c.visited, e.a_id2)
        from cte c
        inner join edges e on e.a_id1 = c.a_id2
        where not array_contains(e.a_id2, c.visited)
    )
select a_id1, dense_rank() over(order by min(a_id2)) as grp
from cte
group by a_id1
order by grp, a_id1

这会将每个a_id以及它所属的组放在一个单独的行中。如果您希望将结果作为数组以及对应的 te b_id,您可以将外部查询更改为:

with ...
select array_agg(distinct t.a_id) as a_ids, array_agg(distinct t.b_id) as b_ids
from temp_data t
inner join (
    select a_id1, dense_rank() over(order by min(a_id2)) as grp
    from cte
    group by a_id1
) x on x.a_id1 = t.a_id
group by x.grp

推荐阅读