首页 > 解决方案 > 在scala数据框中合并地图

问题描述

我有一个包含 col1、col2、col3 列的数据框。col1,col2 是字符串。col3 是下面定义的 Map[String,String]

 |-- col3: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

我已按 col1、col2 分组并使用 collect_list 聚合以获取地图数组并存储在 col4 中。

 df.groupBy($"col1", $"col2").agg(collect_list($"col3").as("col4"))

 |-- col4: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)

但是,我想将 col4 作为单个地图,并将所有地图组合在一起。目前我有:

[[a->a1,b->b1],[c->c1]]

预期产出

[a->a1,b->b1,c->c1]

使用 udf 会是理想的吗?

任何帮助表示赞赏。谢谢。

标签: scaladataframeapache-sparkuser-defined-functionsscala-collections

解决方案


您可以使用聚合map_concat

import org.apache.spark.sql.functions.{expr, collect_list}

val df = Seq(
  (1, Map("k1" -> "v1", "k2" -> "v3")),
  (1, Map("k3" -> "v3")),
  (2, Map("k4" -> "v4")),
  (2, Map("k6" -> "v6", "k5" -> "v5"))
).toDF("id", "data")

val mergeExpr = expr("aggregate(data, map(), (acc, i) -> map_concat(acc, i))")

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeExpr.as("merged_data"))
  .show(false)

// +---+------------------------------+
// |id |merged_data                   |
// +---+------------------------------+
// |1  |[k1 -> v1, k2 -> v3, k3 -> v3]|
// |2  |[k4 -> v4, k6 -> v6, k5 -> v5]|
// +---+------------------------------+

我们通过内置函数连接数据列的map_concat所有Map项目,该函数允许我们将聚合应用于列表对。aggregate

注意:当前 Spark 2.4.5 上的 map_concat 实现允许相同的键共存。这很可能是一个错误,因为根据官方文档,这不是预期的行为。请注意这一点。

如果您想避免这种情况,您也可以选择 UDF:

import org.apache.spark.sql.functions.{collect_list, udf}

val mergeMapUDF = udf((data: Seq[Map[String, String]]) => data.reduce(_ ++ _))

df.groupBy("id").agg(collect_list("data").as("data"))
  .select($"id", mergeMapUDF($"data").as("merged_data"))
  .show(false)

推荐阅读