首页 > 解决方案 > 使用所需的键和值更新 Map 类型的 Spark 数据帧的列

问题描述

我有一个以下 spark 数据框,其中所有列(主键列 emp_id 除外)都由一个映射组成(键 'from' 和 'to' 可以有空值)。我想评估每列的'from'和'to'(emp_id除外)并向地图添加一个新键(名为'change'),如果'from'值为null,则值为a)'insert'并且'to' 不为 null b) 如果 'to' 值为 null 且 'from' 不为 null b) 'update' 如果 'from' 和 'to' 不为 null & 'from' 值不同于 '到'值

注意:具有空值的列将保持不变。

重要说明:这些列的类型不是 Map[String, String] 而是 Map[String, Any] 意味着值可以是其他结构对象

我们如何在 Scala 中实现这一点。

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson]|null                 |[from -> 1000, to -> 8000]|[from ->, to -> Seattle]          |
|3     |null                 |[from -> Norman, to -> Nate]|null                 |[from -> 1000, to -> 8000]|[from -> CherryHill, to -> Newark]|
|4     |[from ->, to -> Iowa]|[from ->, to -> Ian]        |[from ->, to -> 1004]|[from ->, to -> 8000]     |[from ->, to -> Des Moines]       |

预期的:

|emp_id|emp_city             |emp_name                    |emp_phone            |emp_sal                    |emp_site                          |

|1     |null                 |[from -> Will, to -> Watson, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from ->, to -> Seattle, change -> insert]          |
|3     |null                 |[from -> Norman, to -> Nate, change -> update]|null                 |[from -> 1000, to -> 8000, change -> update]|[from -> CherryHill, to -> Newark, change -> update]|
|4     |[from ->, to -> Iowa, change -> insert]|[from ->, to -> Ian, change -> insert]        |[from ->, to -> 1004, change -> insert]|[from ->, to -> 8000, change -> insert]     |[from ->, to -> Des Moines, change -> insert]       |

标签: scaladataframeapache-sparkapache-spark-sql

解决方案


您可以通过如下的行映射器函数来执行此操作,请查找内联代码说明

import org.apache.spark.sql.Row
object MapUpdater {

  def main(args: Array[String]): Unit = {

    val spark = Constant.getSparkSess

    import spark.implicits._

    //Load your data
    val df = List(
      (1,null,Map("from" ->"Will","to"-> "Watson"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Seattle")),
      (2,null,Map("from" ->"Norman","to"-> "Nate"),null,Map("from" ->"1000","to"-> "8000"),Map("from" ->"CherryHill","to"-> "Newark")),
      (3,Map("from" ->null,"to"-> "Iowa"),Map("from" ->null,"to"-> "Ian"),Map("from" ->null,"to"-> "1004"),Map("from" ->"1000","to"-> "8000"),Map("from" ->null,"to"-> "Des Moines"))
    ).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")


    //Map each of your row
    df.map(row => {

      val new_emp_city = mapUpdater(row,1)
      val new_emp_name = mapUpdater(row,2)
      val new_emp_phone = mapUpdater(row,3)
      val new_emp_sal = mapUpdater(row,4)
      val new_emp_site = mapUpdater(row,5)

      (row.getInt(0),new_emp_city,new_emp_name,new_emp_phone,new_emp_sal,new_emp_site)

    }).toDF("emp_id","emp_city","emp_name","emp_phone","emp_sal","emp_site")
      .show(false)

  }

  //Row mapper function
  private def mapUpdater(row: Row,colId:Int): Map[String, String] = {
    val old_map = row.getAs[Map[String, String]](colId)

    val new_map: Map[String, String] = if (null != old_map) {
      if (null == old_map.getOrElse("from", null) && null != old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Insert")
      } else if (null != old_map.getOrElse("from", null) && null == old_map.getOrElse("to", null)) {
        old_map + ("change" -> "Delete")
      } else old_map

    } else old_map
    (new_map)
  }
}

推荐阅读