scala - 使用所需的键和值更新 Map 类型的 Spark 数据帧的列
问题描述
我有一个以下 spark 数据框,其中所有列(主键列 emp_id 除外)都由一个映射组成(键 'from' 和 'to' 可以有空值)。我想评估每列的'from'和'to'(emp_id除外)并向地图添加一个新键(名为'change'),如果'from'值为null,则值为a)'insert'并且'to' 不为 null b) 如果 'to' 值为 null 且 'from' 不为 null b) 'update' 如果 'from' 和 'to' 不为 null & 'from' 值不同于 '到'值
注意:具有空值的列将保持不变。
重要说明:这些列的类型不是 Map[String, String] 而是 Map[String, Any] 意味着值可以是其他结构对象
我们如何在 Scala 中实现这一点。
|emp_id|emp_city |emp_name |emp_phone |emp_sal |emp_site |
|1 |null |[from -> Will, to -> Watson]|null |[from -> 1000, to -> 8000]|[from ->, to -> Seattle] |
|3 |null |[from -> Norman, to -> Nate]|null |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4 |[from ->, to -> Iowa]|[from ->, to -> Ian] |[from ->, to -> 1004]|[from ->, to -> 8000] |[from ->, to -> Des Moines] |
预期的:
|emp_id|emp_city |emp_name |emp_phone |emp_sal |emp_site |
|1 |null |[from -> Will, to -> Watson, change -> update]|null |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert] |
|3 |null |[from -> Norman, to -> Nate, change -> update]|null |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4 |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert] |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert] |[from ->, to -> Des Moines, change -> insert] |
解决方案
您可以通过如下的行映射器函数来执行此操作,请查找内联代码说明
import org.apache.spark.sql.Row
object MapUpdater {
def main(args: Array[String]): Unit = {
val spark = Constant.getSparkSess
import spark.implicits._
//Load your data
val df = List(
(1,null,Map("from" ->"Will","to"-> "Watson"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Seattle")),
(2,null,Map("from" ->"Norman","to"-> "Nate"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->"CherryHill","to"-> "Newark")),
(3,Map("from" ->null,"to"-> "Iowa"),Map("from" ->null,"to"-> "Ian"),Map("from" ->null,"to"-> "1004"),Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Des Moines"))
).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")
//Map each of your row
df.map(row => {
val new_emp_city = mapUpdater(row,1)
val new_emp_name = mapUpdater(row,2)
val new_emp_phone = mapUpdater(row,3)
val new_emp_sal = mapUpdater(row,4)
val new_emp_site = mapUpdater(row,5)
(row.getInt(0),new_emp_city,new_emp_name,new_emp_phone,new_emp_sal,new_emp_site)
}).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")
.show(false)
}
//Row mapper function
private def mapUpdater(row: Row,colId:Int): Map[String, String] = {
val old_map = row.getAs[Map[String, String]](colId)
val new_map: Map[String, String] = if (null != old_map) {
if (null == old_map.getOrElse("from", null) && null != old_map.getOrElse("to", null)) {
old_map + ("change" -> "Insert")
} else if (null != old_map.getOrElse("from", null) && null == old_map.getOrElse("to", null)) {
old_map + ("change" -> "Delete")
} else old_map
} else old_map
(new_map)
}
}