首页 > 解决方案 > Spark Scala 更新数据框

问题描述

我有这样的问题:

val data = Seq(("TIM", "FIRST", "A", 1),
                   ("BIM", "SECOND", "A", 2),
                   ("JIM", "THIRD", "B", 1)).toDF("NAME", "POSITION", "GROUP", "INDEX")

    data.show()
    data.printSchema()

    val title = Seq(("A", "MASTER"), ("B", "TEACHER"),
                    ("C", "STUDENT")).toDF("LETTER", "DEGREE")

    title.show()
    title.printSchema()

+----+--------+-----+-----+
|NAME|POSITION|GROUP|INDEX|
+----+--------+-----+-----+
| TIM|   FIRST|    A|    1|
| BIM|  SECOND|    A|    2|
| JIM|   THIRD|    B|    1|
+----+--------+-----+-----+

root
 |-- NAME: string (nullable = true)
 |-- POSITION: string (nullable = true)
 |-- GROUP: string (nullable = true)
 |-- INDEX: integer (nullable = false)

+------+-------+
|LETTER| DEGREE|
+------+-------+
|     A| MASTER|
|     B|TEACHER|
|     C|STUDENT|
+------+-------+

root
 |-- LETTER: string (nullable = true)
 |-- DEGREE: string (nullable = true)

//Final result
+----+--------+-------+--'--+
|NAME|POSITION|  GROUP|INDEX|
+----+--------+-------+-----+
| TIM|   FIRST| MASTER|   1 |
| BIM|  SECOND|      A|   2 |
| JIM|   THIRD|TEACHER|   1 |
+----+--------+-------+-----+


我尝试了几件事:

val result = data.withColumn("GROUP", when('INDEX === 1, ???????????))

问号在哪里我尝试调用 UDF 但我无法从 GROUP 获取当前行值作为参数传递给 UDF。还尝试将选择放在 TITLE 和 GROUP = LETTER 中,但没有任何效果。

第一个数据框很大,而其他数据框在生产中非常小。

是否有一些优雅的方式没有先加入它们然后加入 withColumn ?

谢谢

标签: scaladataframeapache-spark

解决方案


使用广播加入:

data
  .join(broadcast(title),$"GROUP"===$"LETTER")
  .withColumn("GROUP",when($"INDEX"=== 1,$"DEGREE").otherwise($"GROUP"))
  .drop("LETTER","DEGREE")
  .show()

+----+--------+-------+-----+
|NAME|POSITION|  GROUP|INDEX|
+----+--------+-------+-----+
| TIM|   FIRST| MASTER|    1|
| BIM|  SECOND|      A|    2|
| JIM|   THIRD|TEACHER|    1|
+----+--------+-------+-----+

您也可以收集title到查找地图,广播此地图并使用 UDF,但实际上与广播连接相比没有优势


推荐阅读