scala - 如何对结构元素进行分组并将其转换回具有相同架构的结构
问题描述
Spark 2.4.5 在我的数据框中,我有一个结构数组,该数组不时保存一个字段的快照。
现在,我正在寻找一种在数据发生更改时仅拥有快照的方法。
我的架构如下
root
|-- fee: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- updated_at: long (nullable = true)
| | |-- fee: float (nullable = true)
|-- status: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- updated_at: long (nullable = true)
| | |-- status: string (nullable = true)
现有输出:
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee |status |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11], [1584579672000, 12.11], [1584579673000, 12.11]]|[[1584579671000, Closed-A], [1584579672000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
由于“费用”列没有改变,所以它应该只有一个条目因为状态已经改变了几次,所以 o/p 将是 [[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000 , Closed], [1584579675000, Closed-A]] 注意这里的状态“Closed-A”出现了两次。
试图获得以下输出:
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee |status |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11]]|[[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
注意:尽量不要有用户定义的函数。
解决方案
使用 Spark Dataframe API 可以解决上述问题:添加一个单调递增的 id 以唯一标识每条记录,分解和展平数据框,按费用和状态分别分组(根据要求),按 id 聚合分组的数据农场以收集结构,使用 id 连接两个数据框,id 可以放入最终的数据农场。
import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.struct
val idDF = df.withColumn("id", monotonically_increasing_id)
val explodeDf = idDF
.select(col("id"), col("status"), explode(col("fee")).as("fee"))
.select(col("id"), col("fee"), explode(col("status")).as("status"))
val flatDF = explodeDf.select(col("id"), col("fee.fee"), col("fee.updated_at").as("updated_at_fee"), col("status.status"), col("status.updated_at").as("updated_at_status"))
val feeDF = flatDF.groupBy("id", "fee").min("updated_at_fee")
val feeSelectDF = feeDF.select(col("id"), col("fee"), col("min(updated_at_fee)").as("updated_at"))
val feeAggDF = feeSelectDF.groupBy("id").agg(collect_list(struct("fee", "updated_at")).as("fee"))
val statusDF = flatDF.groupBy("id", "status").min("updated_at_status")
val statusSelectDF = statusDF.select(col("id"), col("status"), col("min(updated_at_status)").as("updated_at"))
val statusAggDF = statusSelectDF.groupBy("id").agg(collect_list(struct("status", "updated_at")).as("status"))
val finalDF = feeAggDF.join(statusAggDF, "id")
finalDF.show(10)
finalDF.printSchema()
推荐阅读
- c# - Swagger UI 没有正确获取承载参数
- export-to-csv - 如何在grafana中关闭时间序列数据表中的数据格式?
- reactjs - 无法在 react.js 中使用 axios.patch()
- react-admin - 在 react-admin 中替换 FileInput 中的删除图标
- mysql - Mysql查询获取一列没有其他值的行
- spring - 为什么@SpringBootTest 在构造函数注入中需要@Autowired
- java - 我无法在 windowbuilder 中编辑我的代码,也无法删除 JTable,它一直显示旧的设计控制台
- scala - Scalacache 从使用 memoizeF 设置的缓存中删除
- java - 如何在代码中在 Spring Boot 中配置 Flyway
- android - Flutter AR - 绘制平面