首页 > 解决方案 > 生成单行数据框以进行查找

问题描述

这是我之前发布的后续问题

第1步:

scala> spark.sql("select map('s1', 'p1', 's2', 'p2', 's3', 'p3') as lookup").show()
+--------------------+
|              lookup|
+--------------------+
|[s1 -> p1, s2 -> ...|
+--------------------+

第2步:

scala> val df = Seq(("s1", "p1"), ("s2", "p2"), ("s3", "p3")).toDF("s", "p")
df: org.apache.spark.sql.DataFrame = [s: string, p: string]

scala> df.show()
+---+---+
|  s|  p|
+---+---+
| s1| p1|
| s2| p2|
| s3| p3|
+---+---+

第 3 步:

scala> val df1 = df.selectExpr("map(s,p) lookup")
df1: org.apache.spark.sql.DataFrame = [cc: map<string,string>]

scala> df1.show()
+----------+
|    lookup|
+----------+
|[s1 -> p1]|
|[s2 -> p2]|
|[s3 -> p3]|
+----------+

我在步骤 3 中的预期结果是我在步骤 1 中得到的结果。我怎样才能实现它?

标签: scalaapache-sparkapache-spark-sqlspark-structured-streaming

解决方案


key 和 value 的两列在合并到map之前应该聚合到数组中。

import org.apache.spark.sql.functions._

df.agg(collect_list("s").as("s"), collect_list("p").as("p"))
    .select(map_from_arrays('s,'p).as("lookup"))
    .show(false)

输出:

+------------------------------+
|lookup                        |
+------------------------------+
|[s1 -> p1, s2 -> p2, s3 -> p3]|
+------------------------------+

如果没有collect_list调用,每一行将单独转换为地图。


推荐阅读