scala - 使用表转换(Hive、scala、spark)修复层次结构数据
问题描述
我有一个处理分层数据的任务,但源数据包含层次结构中的错误,即:一些父子链接已损坏。我有一个重新建立这种连接的算法,但我还不能自己实现它。示例:初始数据为
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
如您所见,与 C1 和 D3 的连接在此处丢失。为了恢复连接,我需要为此表应用以下算法:
如果某些 NAME 的 ID 不在 PARENTID 列中(例如 ID = 18、10),则使用 LEVEL = (current LEVEL - 1) 和 PARENTID = (current ID) 创建具有“父级”的行,并获取 ID和 NAME 使得当前 ID < 来自上述 LEVEL 的节点的 ID。
结果必须是这样的:
+------+----+----------+-------+
| NAME | ID | PARENTID | LEVEL |
+------+----+----------+-------+
| A1 | 1 | 2 | 1 |
| B1 | 2 | 3 | 2 |
| B1 | 2 | 18 | 2 |#
| C1 | 18 | 4 | 3 |
| C2 | 3 | 5 | 3 |
| C2 | 3 | 10 | 3 |#
| D1 | 4 | NULL | 4 |
| D2 | 5 | NULL | 4 |
| D3 | 10 | 11 | 4 |
| E1 | 11 | NULL | 5 |
+------+----+----------+-------+
其中带有#的行 - 创建了新行。新模式如下所示:
关于如何在 spark/scala 中执行此算法有什么想法吗?谢谢!
解决方案
您可以从当前数据框构建一个createdRows
数据框,将其与当前数据框联合以获得最终数据框。
createdRows
您可以通过几个步骤构建此数据框:
- 第一步是获取不在 PARENTID 列中的 ID(和 LEVEL)。您可以使用自左反连接来做到这一点。
- 然后,将
ID
列重命名为PARENTID
并更新LEVEL
列,将其减少1
. - 然后,通过将新行
ID
与NAME
列上的输入数据框连接起来,获取新行的LEVEL
列 - 最后,你应用你的条件
ID
<PARENTID
您最终得到以下代码,dataframe
是带有初始数据的数据框:
import org.apache.spark.sql.functions.col
val createdRows = dataframe
// if for some NAME the ID is not in the PARENTID column (like ID = 18, 10)
.select("LEVEL", "ID")
.filter(col("LEVEL") > 1) // Remove root node from created rows
.join(dataframe.select("PARENTID"), col("PARENTID") === col("ID"), "left_anti")
// then create a row with a 'parent' with LEVEL = (current LEVEL - 1) and PARENTID = (current ID)
.withColumnRenamed("ID", "PARENTID")
.withColumn("LEVEL", col("LEVEL") - 1)
// and take ID and NAME
.join(dataframe.select("NAME", "ID", "LEVEL"), Seq("LEVEL"))
// such that the current ID < ID of the node from the LEVEL above.
.filter(col("ID") < col("PARENTID"))
val result = dataframe
.unionByName(createdRows)
.orderBy("NAME", "PARENTID") // Optional, if you want an ordered result
在result
数据框中你得到:
+----+---+--------+-----+
|NAME|ID |PARENTID|LEVEL|
+----+---+--------+-----+
|A1 |1 |2 |1 |
|B1 |2 |3 |2 |
|B1 |2 |18 |2 |
|C1 |18 |4 |3 |
|C2 |3 |5 |3 |
|C2 |3 |10 |3 |
|D1 |4 |null |4 |
|D2 |5 |null |4 |
|D3 |10 |11 |4 |
|E1 |11 |null |5 |
+----+---+--------+-----+
推荐阅读
- arduino - 更改 Arduino uno SD.h 上的 SPI 引脚
- python - 如何显示通过 Python (JayDeBeApi) 从 Dynamics 365 Online 中提取的行?
- pandas - 如何使用多索引分配给 Pandas.Series 中的新行?
- go - 通过 jenkins 部署 go 应用程序
- javascript - JQuery 自动关闭在运行时创建的 li
- android - React Native 的 Android 构建失败
- apache-spark - 使用 Google CloudDataproc 时是否还需要微调 spark 配置参数?
- reactjs - Webpack 构建抛出“.sourceMap 是 .sourceMaps 的别名,不能同时使用”
- java - JavaFX - 如何在场景顶部添加“图像”?
- c# - 在 ASP.net 中设置和 API(URL 问题?)