scala - Apache Spark 迭代 DataFrame 列并应用值转换
问题描述
我将 csv 文件读入 Spark DataFrame 并根据 cvs 文件头推断列名:
val df = spark.read
.format("org.apache.spark.csv")
.option("header", true)
.option("inferSchema", true)
.csv("users.csv")
现在我需要转换列值,例如:
val modifedDf1 = df.withColumn("country", when(col("country") === "Italy", "[ITALY]").otherwise(col("country")))
val modifedDf2 = modifedDf1.withColumn("city", when(col("city") === "Milan", "[MILAN]").otherwise(col("city")))
如您所见,为了修改列值,我需要显式选择列withColumn("city"..
,然后应用条件。
现在,我需要为要修改的每一列重复此代码。
是否可以重写此代码以迭代df
DataFrame 中的每一列并应用以下内容(在伪代码中):
df.foreachColumn {
if (col_name == 'country'))
then when(col_value === "Italy", "[ITALY]").otherwise(col_value)
else if (col_name == 'city'))
then when(col_value === "Milan", "[MILAN]").otherwise(col_value)
}
我会欣赏 Scala 中的示例。
更新
这是我原来的df:
+------+------------------+--------------+-------------+
|name |email |phone |country |
+------+------------------+--------------+-------------+
|Mike |mike@example.com |+91-9999999999|Italy |
|Alex |alex@example.com |+91-9999999998|France |
|John |john@example.com |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+
我现在有以下代码:
val columnsModify = df.columns.map(col).map(column => {
val columnName = s"${column}"
if (columnName == "country") {
column as "[COUNTRY]"
} else if (columnName == "email") {
column as "(EMAIL)"
} else {
column as columnName
}
})
它能够迭代 DataFrame 列并根据指定的条件更改它们的名称。
这是输出:
+------+------------------+--------------+-------------+
|name |(EMAIL) |phone |[COUNTRY] |
+------+------------------+--------------+-------------+
|Mike |mike@example.com |+91-9999999999|Italy |
|Alex |alex@example.com |+91-9999999998|France |
|John |john@example.com |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+
我还需要为列值添加转换逻辑,如下所示(请参阅下面的注释行):
val columnsModify = df.columns.map(col).map(column => {
val columnName = s"${column}"
if (columnName == "country") {
//when(column_value === "Italy", "[ITALY]").otherwise(column_value)
column as "[COUNTRY]"
} else if (columnName == "email") {
column as "(EMAL)"
} else {
column as columnName
}
})
这个脚本的预期输出应该是:
+------+------------------+--------------+-------------+
|name |(EMAL) |phone |[COUNTRY] |
+------+------------------+--------------+-------------+
|Mike |mike@example.com |+91-9999999999|[ITALY] |
|Alex |alex@example.com |+91-9999999998|France |
|John |john@example.com |+1-1111111111 |United States|
|Donald|donald@example.com|+1-2222222222 |United States|
+------+------------------+--------------+-------------+
请展示如何实现它。
解决方案
val newCols = df.schema.map{
column =>
val colName = column.name
colName match{
case "country" => when(col(colName) === "Italy", "ITALY").otherwise(col(colName)).as("[COUNTRY]")
case "email" => col(colName).as("[EMAIL]")
case _ => col(colName)
}
}
df.select(newCols.head, newCols.tail: _*)
推荐阅读
- postgresql - Postgres:在同一列字符串中包含子选择和子选择的派生计算
- php - 在 php/symfony 中取消设置会话数组的有效方法
- dart - 使用 Aqueduct API Dart 在 Heroku 中部署的错误
- android - Android 应用程序(Ionic、React)中没有后端连接,但它在具有相同基本代码的 Web 应用程序中工作
- chart.js - 如何忽略 chart.js 中的标签
- html - Ionic:如何在 Shadow DOM 中更改 span.button-inner 的 CSS
- docker - Pulumi 不执行 Kubernetes Pod 的优雅关闭
- sql-server - 仅 6 分钟后在 SQL Server 超时后重建索引。超时设置为 60 分钟
- javascript - 在调整 Mat-table 大小时检测滚动条是否可见
- node.js - 尝试在 graphql 中检索模式时出现问题