首页 > 解决方案 > 如何在 SQL/Spark/GraphFrames 中进行这种转换

问题描述

我有一个包含以下两列的表:

Device-Id    Account-Id
d1           a1   
d2           a1
d1           a2
d2           a3
d3           a4
d3           a5 
d4           a6
d1           a4

Device-Id 是安装我的应用程序的设备的唯一 ID,Account-Id 是用户帐户的 ID。一个用户可以拥有多个设备,并且可以在同一设备上创建多个帐户(例如,d1 设备设置了 a1、a2 和 a3 帐户)。

我想找到唯一的实际用户(应该在生成的表中表示为具有一些唯一 UUID 的新列)并且我正在寻找的转换生成下表:

Unique-User-Id    Devices-Used    Accounts-Used
uuid1             [d1, d2, d3]    [a1, a2, a3, a4, a5]   
uuid2             [d4]            [a6]

上面生成的表格背后的想法是,实际用户 uuid1 在他们的设备 d1 和 d2 上设置了一个帐户 a1,这实质上意味着这两个设备都属于 uuid 1,并且所有其他帐户都设置在这些 d1 和 d2 上设备也映射到同一个用户 uuid1。同样,d1 也有一个账户 a4,它也在 d3 上设置,所以 d3 也是 uuid1 的设备,它上面的每个账户都应该映射到 uuid1。

如何在 SQL/Spark/GraphFrames(通过 DataBricks)中实现上述转换,其中Device-Ids 和 Account-Ids 都可以是数百万

标签: sqlapache-sparkpysparkspark-graphxgraphframes

解决方案


我对这个解决方案并不感到自豪,因为我认为可能有一个更有效的解决方案,但无论如何我都会把它留在这里。希望能帮助到你

import org.apache.spark.sql.functions._

val flatten_distinct = (array_distinct _) compose (flatten _)

val df = Seq(
  ("d1","a1"),  
  ("d2","a1"),
  ("d1","a2"),
  ("d2","a3"),
  ("d3","a4"),
  ("d3","a5"),
  ("d4","a6")
).toDF("d_id","u_id")


val userDevices = df
  .groupBy("u_id")
  .agg(collect_list("d_id").alias("d_id_list"))

//+----+---------+
//|u_id|d_id_list|
//+----+---------+
//|  a5|     [d3]|
//|  a3|     [d2]|
//|  a4|     [d3]|
//|  a2|     [d1]|
//|  a1| [d1, d2]|
//|  a6|     [d4]|
//+----+---------+


val accountsByDevice = df
  .groupBy("d_id")
  .agg(collect_list("u_id").alias("u_id_list"))

//+----+---------+
//|d_id|u_id_list|
//+----+---------+
//|  d2| [a3, a1]|
//|  d3| [a4, a5]|
//|  d1| [a1, a2]|
//|  d4|     [a6]|
//+----+---------+


val ungroupedDf = userDevices
  .join(accountsByDevice, expr("array_contains(d_id_list,d_id)"))
  .groupBy("d_id_list")
  .agg(collect_set("u_id_list") as "set")
  .select(col("d_id_list") as "d_id", flatten_distinct(col("set")) as "u_id")
  .select(explode(col("d_id")) as "d_id", col("u_id"), size(col("u_id")) as "size")

//+----+------------+----+
//|d_id|        u_id|size|
//+----+------------+----+
//|  d2|    [a1, a3]|   2|
//|  d1|[a1, a3, a2]|   3|
//|  d2|[a1, a3, a2]|   3|
//|  d3|    [a4, a5]|   2|
//|  d1|    [a1, a2]|   2|
//|  d4|        [a6]|   1|
//+----+------------+----+


val finalDf = ungroupedDf
  .join(ungroupedDf.groupBy("d_id").agg(max(col("size")) as "size"), Seq("size","d_id"))
  .groupBy("u_id")
  .agg(collect_set("d_id") as "d_id")
  .withColumn("unique_id", monotonically_increasing_id())

//+------------+--------+-------------+
//|        u_id|    d_id|    unique_id|
//+------------+--------+-------------+
//|[a1, a2, a3]|[d1, d2]|1228360646656|
//|    [a4, a5]|    [d3]|1297080123392|
//|        [a6]|    [d4]|1520418422784|
//+------------+--------+-------------+

推荐阅读