scala - 通过将列集合与另一个集合一起解包来选择 Dataframe 列
问题描述
我有一段代码,我正在使用具有特定条件(使用 map 函数)的集合(称为 my_list)中的列名解包,如下所示:
val df1 = Seq(
("id1", "test_name1", "test_address1", "test_code1", "V1"),
("id2", "test_name2", "test_address2", "test_code2", "V2")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")
val df2 = Seq(
("id1", "test_name1.1", "test_address1.1", "test_code1.1", "V1"),
("id4", "test_name4", "test_address4", "test_code4", "V4")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")
val my_list = List("col_a", "col_b")
val my_list2 = List("col_c", "col_d", "col_e")
val joinDF = df1.as("l").join(df2.as("r"), df1("col_a") === df2("col_a"), "leftouter").select(col("l.col_c") :: col("l.col_d") :: col("l.col_e") :: my_list.map(my_function): _*)
}
my_function 看起来像:
def my_function(columnName: String) : org.apache.spark.sql.Column = {
when(
$"l.$columnName" === $"r.$columnName", null
).otherwise($"l.$columnName").as(columnName)
}
如何实现在 joinDF 中解包另一个列表(即 my_list2)以避免硬编码列名集(l.col_c、l.col_d、l.col_e)与现有的my_list.map(my_function): _*
?
由于变量参数的工作方式,我看到解包第二个集合的问题。
预期输出:
+-------------+----------+-----+-----+----------+
|col_c |col_d |col_e|col_a|col_b |
+-------------+----------+-----+-----+----------+
|test_address1|test_code1|V1 |null |test_name1|
|test_address2|test_code2|V2 |id2 |test_name2|
+-------------+----------+-----+-----+----------+
解决方案
SparkSession的隐式转换应该在my_function中导入:
def my_function(columnName: String) : org.apache.spark.sql.Column = {
val spark = SparkConfig.getSparkSession
import spark.implicits._
when(
$"l.$columnName" === $"r.$columnName", null
).otherwise($"l.$columnName").as(columnName)
}
火花配置:
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
object SparkConfig {
val APP_NAME: String = "app_name"
var sparkConf: SparkConf = _
var sparkSession: SparkSession = _
def getSparkConf: SparkConf = {
if(sparkConf == null) {
val appName: String = APP_NAME
sparkConf = new SparkConf().setAppName(appName)
sparkConf.setMaster("local[*]")
}
sparkConf
}
def getSparkSession: SparkSession = {
if(sparkSession == null)
sparkSession = SparkSession.builder().enableHiveSupport().config(getSparkConf).getOrCreate()
sparkSession
}
}
以下代码没有被修改:
val spark = SparkConfig.getSparkSession
import spark.implicits._
val df1 = Seq(
("id1", "test_name1", "test_address1", "test_code1", "V1"),
("id2", "test_name2", "test_address2", "test_code2", "V2")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")
val df2 = Seq(
("id1", "test_name1.1", "test_address1.1", "test_code1.1", "V1"),
("id4", "test_name4", "test_address4", "test_code4", "V4")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")
val my_list = List("col_a", "col_b")
val my_list2 = List("col_c", "col_d", "col_e")
val joinDF = df1.as("l")
.join(df2.as("r"), df1("col_a") === df2("col_a"), "leftouter")
.select(col("l.col_c") :: col("l.col_d") :: col("l.col_e") :: my_list.map(my_function): _*)
.show()
输出:
+-------------+----------+-----+-----+----------+
| col_c| col_d|col_e|col_a| col_b|
+-------------+----------+-----+-----+----------+
|test_address1|test_code1| V1| null|test_name1|
|test_address2|test_code2| V2| id2|test_name2|
+-------------+----------+-----+-----+----------+
推荐阅读
- node.js - 在 node.js 中作为子进程运行 shell(bash, zsh, sh...)
- reactjs - 用户使用浏览器后退按钮后如何恢复我的应用程序的先前状态?
- google-apps-script - 活动结束日期不正确
- javascript - 在 gulp.series 中传递函数参数
- azure-devops - 将项目上传到 Azure Devops
- node.js - Heroku 最近开始在 Webpack 构建期间给出部署错误“npm ERR!code ELIFECYCLE”
- javascript - 选择的选择 - 过滤器
- vue.js - 为什么 Vuetify 规则在延迟验证中未设置为 0?
- python - 如何模拟基于 *args 输入返回值的函数?
- c# - 如何在另一个应用程序写入文件时安全地复制文件,确保两个程序都不会崩溃