首页 > 解决方案 > 使用表转换(Hive、scala、spark)修复层次结构数据

问题描述

我有一个处理分层数据的任务,但源数据包含层次结构中的错误,即:一些父子链接已损坏。我有一个重新建立这种连接的算法,但我还不能自己实现它。示例:初始数据为

+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1   |  1 | 2        |     1 |
| B1   |  2 | 3        |     2 |
| C1   | 18 | 4        |     3 |
| C2   |  3 | 5        |     3 |
| D1   |  4 | NULL     |     4 |
| D2   |  5 | NULL     |     4 |
| D3   | 10 | 11       |     4 |
| E1   | 11 | NULL     |     5 |
+------+----+----------+-------+

示意图如下: 在此处输入图像描述

如您所见,与 C1 和 D3 的连接在此处丢失。为了恢复连接,我需要为此表应用以下算法:

如果某些 NAME 的 ID 不在 PARENTID 列中(例如 ID = 18、10),则使用 LEVEL = (current LEVEL - 1) 和 PARENTID = (current ID) 创建具有“父级”的行,并获取 ID和 NAME 使得当前 ID < 来自上述 LEVEL 的节点的 ID。

结果必须是这样的:

+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1   |  1 | 2        |     1 |
| B1   |  2 | 3        |     2 |
| B1   |  2 | 18       |     2 |#
| C1   | 18 | 4        |     3 |
| C2   |  3 | 5        |     3 |
| C2   |  3 | 10       |     3 |#
| D1   |  4 | NULL     |     4 |
| D2   |  5 | NULL     |     4 |
| D3   | 10 | 11       |     4 |
| E1   | 11 | NULL     |     5 |
+------+----+----------+-------+

其中带有#的行 - 创建了新行。新模式如下所示:

在此处输入图像描述

关于如何在 spark/scala 中执行此算法有什么想法吗?谢谢!

标签: scalaapache-sparkhiveapache-spark-sqlhierarchy

解决方案


您可以从当前数据框构建一个createdRows数据框,将其与当前数据框联合以获得最终数据框。

createdRows您可以通过几个步骤构建此数据框:

  • 第一步是获取不在 PARENTID 列中的 ID(和 LEVEL)。您可以使用自左反连接来做到这一点。
  • 然后,将ID列重命名为PARENTID并更新LEVEL列,将其减少1.
  • 然后,通过将新行IDNAME列上的输入数据框连接起来,获取新行的LEVEL
  • 最后,你应用你的条件ID<PARENTID

您最终得到以下代码,dataframe是带有初始数据的数据框:

import org.apache.spark.sql.functions.col

val createdRows = dataframe
  // if for some NAME the ID is not in the PARENTID column (like ID = 18, 10)
  .select("LEVEL", "ID")
  .filter(col("LEVEL") > 1) // Remove root node from created rows
  .join(dataframe.select("PARENTID"), col("PARENTID") === col("ID"), "left_anti")
  // then create a row with a 'parent' with LEVEL = (current LEVEL - 1) and PARENTID = (current ID)
  .withColumnRenamed("ID", "PARENTID")
  .withColumn("LEVEL", col("LEVEL") - 1)
  // and take ID and NAME  
  .join(dataframe.select("NAME", "ID", "LEVEL"), Seq("LEVEL"))
  // such that the current ID < ID of the node from the LEVEL above.
  .filter(col("ID") < col("PARENTID"))

val result = dataframe
  .unionByName(createdRows)
  .orderBy("NAME", "PARENTID") // Optional, if you want an ordered result

result数据框中你得到:

+----+---+--------+-----+
|NAME|ID |PARENTID|LEVEL|
+----+---+--------+-----+
|A1  |1  |2       |1    |
|B1  |2  |3       |2    |
|B1  |2  |18      |2    |
|C1  |18 |4       |3    |
|C2  |3  |5       |3    |
|C2  |3  |10      |3    |
|D1  |4  |null    |4    |
|D2  |5  |null    |4    |
|D3  |10 |11      |4    |
|E1  |11 |null    |5    |
+----+---+--------+-----+

推荐阅读