scala - 使用嵌套列加入两个 spark Dataframe 并更新其中一列
问题描述
我正在处理一些要求,其中我从 CSV 文件中获取一个小表,如下所示:
root
|-- ACCT_NO: string (nullable = true)
|-- SUBID: integer (nullable = true)
|-- MCODE: string (nullable = true)
|-- NewClosedDate: timestamp (nullable = true
我们还有一个非常大的 Avro 形式的外部配置单元表,它存储在 HDFS 中,如下所示:
root
-- accountlinks: array (nullable = true)
| | |-- account: struct (nullable = true)
| | | |-- acctno: string (nullable = true)
| | | |-- subid: string (nullable = true)
| | | |-- mcode: string (nullable = true)
| | | |-- openeddate: string (nullable = true)
| | | |-- closeddate: string (nullable = true)
现在,要求是根据 csv 文件中的三列查找外部配置单元表:ACCT_NO - SUBID - MCODE
. 如果匹配,则accountlinks.account.closeddate
使用NewClosedDate
CSV 文件更新。
我已经编写了以下代码来分解所需的列并将其与小表连接,但我不确定如何使用 NewClosedDate 更新 closeddate 字段(对于所有帐户持有人而言,这当前为空),因为 closeddate 是一个嵌套列和我不能轻易使用 withColumn 来填充它。除此之外,无法更改架构和列名,因为这些文件链接到某些外部配置单元表。
val df = spark.sql("select * from db.table where archive='201711'")
val ExtractedColumn = df
.coalesce(150)
.withColumn("ACCT_NO", explode($"accountlinks.account.acctno"))
.withColumn("SUBID", explode($"accountlinks.account.acctsubid"))
.withColumn("MCODE", explode($"C.mcode"))
val ReferenceData = spark.read.format("csv")
.option("header","true")
.option("inferSchema","true")
.load("file.csv")
val FinalData = ExtractedColumn.join(ReferenceData, Seq("ACCT_NO","SUBID","MCODE") , "left")
解决方案
您只需要分解accountlinks
数组,然后像这样加入 2 个数据框:
val explodedDF = df.withColumn("account", explode($"accountlinks"))
val joinCondition = $"ACCT_NO" === $"account.acctno" && $"SUBID" === $"account.subid" && $"MCODE" === $"account.mcode"
val joinDF = explodedDF.join(ReferenceData, joinCondition, "left")
现在您可以account
像下面那样更新结构列,并收集列表以获取数组结构:
val FinalData = joinDF.withColumn("account",
struct($"account.acctno", $"account.subid", $"account.mcode",
$"account.openeddate", $"NewClosedDate".alias("closeddate")
)
)
.groupBy().agg(collect_list($"account").alias("accountlinks"))
这个想法是创建一个新结构,其中包含 account
除closedate
从NewCloseDate
列中获得的所有字段。
如果结构包含许多字段,您可以使用 for-comprehension 来获取除结束日期之外的所有字段,以防止全部输入。
推荐阅读
- razor - 带有延迟加载的 Sitecore GlassMapper RenderImage
- graphql - 如何从 knex 插件中取回 ID?
- android - Moshi 自定义字段不适用于序列化
- dictionary - 如何在 main.ml 文件中使用此字典(创建字典、添加元素、删除 ....)?
- phpstorm - PhpStorm 从代码选择生成模板
- python - 将函数分配给类而不调用此函数?
- javascript - 如何根据我在 javascript 中的输入设置显示不同内容的弹出窗口
- c# - 将 dataType[] 替换为 List
- html - 有没有办法在影子 dom 中选择实际的第一个孩子?
- java - Spring Data JPA - 是否可以在将实体持久保存在存储库中之前使用主体更新实体?