首页 > 解决方案 > Apache Spark 迭代 DataFrame 列并应用值转换

问题描述

我将 csv 文件读入 Spark DataFrame 并根据 cvs 文件头推断列名:

val df = spark.read
  .format("org.apache.spark.csv")
  .option("header", true)
  .option("inferSchema", true)
  .csv("users.csv")

现在我需要转换列值,例如:

val modifedDf1 = df.withColumn("country", when(col("country") === "Italy", "[ITALY]").otherwise(col("country")))

val modifedDf2 = modifedDf1.withColumn("city", when(col("city") === "Milan", "[MILAN]").otherwise(col("city")))

如您所见,为了修改列值,我需要显式选择列withColumn("city"..,然后应用条件。

现在,我需要为要修改的每一列重复此代码。

是否可以重写此代码以迭代dfDataFrame 中的每一列并应用以下内容(在伪代码中):

df.foreachColumn {
    if (col_name == 'country')) 
        then when(col_value === "Italy", "[ITALY]").otherwise(col_value)
    else if (col_name == 'city')) 
        then when(col_value === "Milan", "[MILAN]").otherwise(col_value)
}

我会欣赏 Scala 中的示例。

更新

这是我原来的df:

+------+------------------+--------------+-------------+
|name  |email             |phone         |country      |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|Italy        |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

我现在有以下代码:

val columnsModify = df.columns.map(col).map(column => {
  val columnName = s"${column}"
  if (columnName == "country") {
    column as "[COUNTRY]"
  } else if (columnName == "email") {
    column as "(EMAIL)"
  } else {
    column as columnName
  }
})

它能够迭代 DataFrame 列并根据指定的条件更改它们的名称。

这是输出:

+------+------------------+--------------+-------------+
|name  |(EMAIL)           |phone         |[COUNTRY]    |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|Italy        |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

我还需要为列值添加转换逻辑,如下所示(请参阅下面的注释行):

val columnsModify = df.columns.map(col).map(column => {
  val columnName = s"${column}"
  if (columnName == "country") {
    //when(column_value === "Italy", "[ITALY]").otherwise(column_value)
    column as "[COUNTRY]"
  } else if (columnName == "email") {
    column as "(EMAL)"
  } else {
    column as columnName
  }
})

这个脚本的预期输出应该是:

+------+------------------+--------------+-------------+
|name  |(EMAL)            |phone         |[COUNTRY]    |
+------+------------------+--------------+-------------+
|Mike  |mike@example.com  |+91-9999999999|[ITALY]      |
|Alex  |alex@example.com  |+91-9999999998|France       |
|John  |john@example.com  |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+

请展示如何实现它。

标签: scalaapache-sparkapache-spark-sql

解决方案


val newCols = df.schema.map{
  column =>

    val colName = column.name

    colName match{
      case "country" => when(col(colName) === "Italy", "ITALY").otherwise(col(colName)).as("[COUNTRY]") 
      case "email" => col(colName).as("[EMAIL]")
      case _ => col(colName) 
    } 
}

df.select(newCols.head, newCols.tail: _*)

推荐阅读