首页 > 解决方案 > 如何对结构元素进行分组并将其转换回具有相同架构的结构

问题描述

Spark 2.4.5 在我的数据框中,我有一个结构数组,该数组不时保存一个字段的快照。

现在,我正在寻找一种在数据发生更改时仅拥有快照的方法。

我的架构如下

root 
 |-- fee: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- updated_at: long (nullable = true)
 |    |    |-- fee: float (nullable = true)
 |-- status: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- updated_at: long (nullable = true)
 |    |    |-- status: string (nullable = true)

现有输出:

+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee                                                                     |status                                                                                       |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11], [1584579672000, 12.11], [1584579673000, 12.11]]|[[1584579671000, Closed-A], [1584579672000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+

由于“费用”列没有改变,所以它应该只有一个条目因为状态已经改变了几次,所以 o/p 将是 [[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000 , Closed], [1584579675000, Closed-A]] 注意这里的状态“Closed-A”出现了两次。

试图获得以下输出:

+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|fee                     |status                                                                                        |
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+
|[[1584579671000, 12.11]]|[[1584579671000, Closed-A], [1584579673000, Closed-B], [1584579674000, Closed], [1584579675000, Closed-A]]|
+------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------+

注意:尽量不要有用户定义的函数。

标签: scalaapache-sparkapache-spark-sqlspark-streaming

解决方案


使用 Spark Dataframe API 可以解决上述问题:添加一个单调递增的 id 以唯一标识每条记录,分解和展平数据框,按费用和状态分别分组(根据要求),按 id 聚合分组的数据农场以收集结构,使用 id 连接两个数据框,id 可以放入最终的数据农场。

import org.apache.spark.sql.functions.explode
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.monotonically_increasing_id
import org.apache.spark.sql.functions.collect_list
import org.apache.spark.sql.functions.struct

val idDF = df.withColumn("id", monotonically_increasing_id)

val explodeDf = idDF
  .select(col("id"), col("status"), explode(col("fee")).as("fee"))
  .select(col("id"), col("fee"), explode(col("status")).as("status"))

val flatDF = explodeDf.select(col("id"), col("fee.fee"), col("fee.updated_at").as("updated_at_fee"), col("status.status"), col("status.updated_at").as("updated_at_status"))

val feeDF = flatDF.groupBy("id", "fee").min("updated_at_fee")
val feeSelectDF = feeDF.select(col("id"), col("fee"), col("min(updated_at_fee)").as("updated_at"))
val feeAggDF = feeSelectDF.groupBy("id").agg(collect_list(struct("fee", "updated_at")).as("fee"))


val statusDF = flatDF.groupBy("id", "status").min("updated_at_status")
val statusSelectDF = statusDF.select(col("id"), col("status"), col("min(updated_at_status)").as("updated_at"))
val statusAggDF = statusSelectDF.groupBy("id").agg(collect_list(struct("status", "updated_at")).as("status"))

val finalDF = feeAggDF.join(statusAggDF, "id")
finalDF.show(10)
finalDF.printSchema()

推荐阅读