scala - Spark Scala 更新数据框
问题描述
我有这样的问题:
val data = Seq(("TIM", "FIRST", "A", 1),
("BIM", "SECOND", "A", 2),
("JIM", "THIRD", "B", 1)).toDF("NAME", "POSITION", "GROUP", "INDEX")
data.show()
data.printSchema()
val title = Seq(("A", "MASTER"), ("B", "TEACHER"),
("C", "STUDENT")).toDF("LETTER", "DEGREE")
title.show()
title.printSchema()
+----+--------+-----+-----+
|NAME|POSITION|GROUP|INDEX|
+----+--------+-----+-----+
| TIM| FIRST| A| 1|
| BIM| SECOND| A| 2|
| JIM| THIRD| B| 1|
+----+--------+-----+-----+
root
|-- NAME: string (nullable = true)
|-- POSITION: string (nullable = true)
|-- GROUP: string (nullable = true)
|-- INDEX: integer (nullable = false)
+------+-------+
|LETTER| DEGREE|
+------+-------+
| A| MASTER|
| B|TEACHER|
| C|STUDENT|
+------+-------+
root
|-- LETTER: string (nullable = true)
|-- DEGREE: string (nullable = true)
//Final result
+----+--------+-------+--'--+
|NAME|POSITION| GROUP|INDEX|
+----+--------+-------+-----+
| TIM| FIRST| MASTER| 1 |
| BIM| SECOND| A| 2 |
| JIM| THIRD|TEACHER| 1 |
+----+--------+-------+-----+
我尝试了几件事:
val result = data.withColumn("GROUP", when('INDEX === 1, ???????????))
问号在哪里我尝试调用 UDF 但我无法从 GROUP 获取当前行值作为参数传递给 UDF。还尝试将选择放在 TITLE 和 GROUP = LETTER 中,但没有任何效果。
第一个数据框很大,而其他数据框在生产中非常小。
是否有一些优雅的方式没有先加入它们然后加入 withColumn ?
谢谢
解决方案
使用广播加入:
data
.join(broadcast(title),$"GROUP"===$"LETTER")
.withColumn("GROUP",when($"INDEX"=== 1,$"DEGREE").otherwise($"GROUP"))
.drop("LETTER","DEGREE")
.show()
+----+--------+-------+-----+
|NAME|POSITION| GROUP|INDEX|
+----+--------+-------+-----+
| TIM| FIRST| MASTER| 1|
| BIM| SECOND| A| 2|
| JIM| THIRD|TEACHER| 1|
+----+--------+-------+-----+
您也可以收集title
到查找地图,广播此地图并使用 UDF,但实际上与广播连接相比没有优势
推荐阅读
- c++ - GNU ARM Embedded Toolchain 与具有裸机 ARM 架构的普通 gcc/g++ 之间的区别
- ruby-on-rails - Ruby on Rails - 从孙表中获取记录
- python - 模棱两可的真值 - For 循环和 If 语句
- multithreading - Coordinating threads using shared std::atomic as a counter
- php - 修改 session.save_path 后 session_destroy 不起作用(会话数据保存在服务器上)
- python - 为什么我不能在堆栈中推送多个数字来计算前缀?
- asp.net-core - ASP.NET Core MVC:发布项目后,如何强制迁移更新到数据库?
- laravel - SSL Error in Laravel while calling external API
- javascript - 如何在jQuery中选择范围之间的元素
- java - finding object reference in array by index