首页 > 解决方案 > 使用所需的键和值更新 Map 类型的 Spark 数据框的列

问题描述

我有一个以下 spark 数据框,其中所有列(主键列 emp_id 除外)都由一个映射组成(键 'from' 和 'to' 可以有空值)。我想评估每列的'from'和'to'(emp_id除外)并向地图添加一个新键(名为'change'),如果'from'值为null,则值为a)'insert'并且'to' 不为 null b) 如果 'to' 值为 null 且 'from' 不为 null b) 'update' 如果 'from' 和 'to' 不为 null & 'from' 值不同于 '到'值

注意:具有空值的列将保持不变。

我们如何在 Scala 中实现这一点。

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期的:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


实现此目的的一种方法是使用UDF,这不是一个很好的解决方案,但我想不出其他解决方案。

尽量不要UDF使用

val updateMap = udf((input: Map[String, String]) => {
  if (input == null || input.isEmpty)
    Map.empty[String, String]
  else if (input("from") == null && input("to") != null)
    input + ("change" -> "insert")
  else if (input("from") != null && input("to") == null)
    input + ("change" -> "delete")
  else if (!(input("from").equals(input("to"))))
    input + ("change" -> "update")
  else
    Map.empty[String, String]

})

val result = df.columns.tail.foldLeft(df) { (acc, name) =>
  acc.withColumn(name, updateMap(col(name)))
}

确保您的列是Map[String, String]

希望这可以帮助!


推荐阅读