首页 > 解决方案 > 通过将列集合与另一个集合一起解包来选择 Dataframe 列

问题描述

我有一段代码,我正在使用具有特定条件(使用 map 函数)的集合(称为 my_list)中的列名解包,如下所示:

 val df1 = Seq(
  ("id1", "test_name1", "test_address1", "test_code1", "V1"),
  ("id2", "test_name2", "test_address2", "test_code2", "V2")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")

val df2 = Seq(
  ("id1", "test_name1.1", "test_address1.1", "test_code1.1", "V1"),
  ("id4", "test_name4", "test_address4", "test_code4", "V4")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")

val my_list = List("col_a", "col_b")
val my_list2 = List("col_c", "col_d", "col_e")
val joinDF = df1.as("l").join(df2.as("r"), df1("col_a") === df2("col_a"), "leftouter").select(col("l.col_c") :: col("l.col_d") :: col("l.col_e") :: my_list.map(my_function): _*)

}

my_function 看起来像:

def my_function(columnName: String) : org.apache.spark.sql.Column = {
when(
  $"l.$columnName" === $"r.$columnName", null
).otherwise($"l.$columnName").as(columnName)
}

如何实现在 joinDF 中解包另一个列表(即 my_list2)以避免硬编码列名集(l.col_c、l.col_d、l.col_e)与现有的my_list.map(my_function): _*

由于变量参数的工作方式,我看到解包第二个集合的问题。

预期输出:

+-------------+----------+-----+-----+----------+
|col_c        |col_d     |col_e|col_a|col_b     |
+-------------+----------+-----+-----+----------+
|test_address1|test_code1|V1   |null |test_name1|
|test_address2|test_code2|V2   |id2  |test_name2|
+-------------+----------+-----+-----+----------+

标签: scalaapache-sparkapache-spark-sql

解决方案


SparkSession的隐式转换应该在my_function中导入:

def my_function(columnName: String) : org.apache.spark.sql.Column = {
  val spark = SparkConfig.getSparkSession
  import spark.implicits._
  when(
    $"l.$columnName" === $"r.$columnName", null
  ).otherwise($"l.$columnName").as(columnName)
}

火花配置:

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SparkConfig {
  val APP_NAME: String = "app_name"
  var sparkConf: SparkConf = _
  var sparkSession: SparkSession = _


  def getSparkConf: SparkConf = {
    if(sparkConf == null) {
      val appName: String = APP_NAME
      sparkConf = new SparkConf().setAppName(appName)
      sparkConf.setMaster("local[*]")
    }
    sparkConf
  }


  def getSparkSession: SparkSession = {
    if(sparkSession == null)
      sparkSession = SparkSession.builder().enableHiveSupport().config(getSparkConf).getOrCreate()
    sparkSession
  }
}

以下代码没有被修改:

val spark = SparkConfig.getSparkSession
import spark.implicits._

val df1 = Seq(
  ("id1", "test_name1", "test_address1", "test_code1", "V1"),
  ("id2", "test_name2", "test_address2", "test_code2", "V2")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")

val df2 = Seq(
  ("id1", "test_name1.1", "test_address1.1", "test_code1.1", "V1"),
  ("id4", "test_name4", "test_address4", "test_code4", "V4")
).toDF("col_a", "col_b", "col_c", "col_d", "col_e")

val my_list = List("col_a", "col_b")
val my_list2 = List("col_c", "col_d", "col_e")
val joinDF = df1.as("l")
  .join(df2.as("r"), df1("col_a") === df2("col_a"), "leftouter")
  .select(col("l.col_c") :: col("l.col_d") :: col("l.col_e") :: my_list.map(my_function): _*)
  .show()

输出:

+-------------+----------+-----+-----+----------+
|        col_c|     col_d|col_e|col_a|     col_b|
+-------------+----------+-----+-----+----------+
|test_address1|test_code1|   V1| null|test_name1|
|test_address2|test_code2|   V2|  id2|test_name2|
+-------------+----------+-----+-----+----------+

推荐阅读