首页 > 解决方案 > 将函数应用于 Spark DataFrame 结构内的列,替换该列

问题描述

我找不到我正在寻找的确切内容,所以这是我的问题。我从 MongoDb 获取一些数据到 Spark Dataframe 中。数据框具有以下架构 ( df.printSchema):

|-- flight: struct (nullable = true)
|    |-- legs: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)
|    |-- segments: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- arrival: timestamp (nullable = true)
|    |    |    |-- departure: timestamp (nullable = true)

请注意顶层结构,后跟一个数组,我需要在其中更改我的数据。例如:

{
  "flight": {
    "legs": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ],
    "segments": [{
        "departure": ISODate("2020-10-30T13:35:00.000Z"),
        "arrival": ISODate("2020-10-30T14:47:00.000Z")
      }
    ]
  }
}

我想在 Json 中导出它,但出于某种商业原因,我希望到达日期的格式与出发日期的格式不同。例如,我可能想从纪元导出以毫秒为单位的出发 ISODate,而不是到达的。

为此,我想到了应用自定义函数来进行转换:

  // Here I can do any tranformation. I hope to replace the timestamp with the needed value
  val doSomething: UserDefinedFunction = udf(  (value: Seq[Timestamp]) => {
    value.map(x => "doSomething" + x.getTime) }
  )

  val newDf = df.withColumn("flight.legs.departure",
    doSomething(df.col("flight.legs.departure")))

但这只是返回一个全新的列,其中包含单个doSomething字符串的数组。

{
  "flight": {
    "legs": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z"
      }
    ],
    "segments": [{
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
      }
    ]
  },
  "flight.legs.departure": ["doSomething1596268800000"]
}

newDf.show(1)

+--------------------+---------------------+
|              flight|flight.legs.departure|
+--------------------+---------------------+
|[[[182], 94, [202...| [doSomething15962...|
+--------------------+---------------------+

代替

{
  ...
        "arrival": "2020-10-30T14:47:00Z",
        //leg departure date that I changed
        "departure": "doSomething1596268800000"
  ...   // segments not affected in this example
        "arrival": "2020-10-30T14:47:00Z",
        "departure": "2020-10-30T13:35:00Z",
 ...
}

任何想法如何进行?

编辑 - 澄清:请记住,我的架构比上面显示的要复杂得多。例如,还有另一个顶级data标签,flight下面还有其他信息。然后在里面flightlegs还有segments更多的元素,有些也是嵌套的。我只专注于我需要改变的那些。

我这么说是因为我想要最简单的可以扩展的解决方案。即理想情况下,只需更改所需元素而无需解构并重新构建整个嵌套结构。如果我们无法避免这种情况,那么使用案例类是最简单的解决方案吗?

标签: mongodbscalaapache-spark

解决方案


请检查下面的代码。

执行时间处理时间

With UDF :所用时间:679 毫秒

Without UDF :所用时间:1493 毫秒

Code With UDF

scala> :paste
// Entering paste mode (ctrl-D to finish)

  // Creating UDF to update value inside array.
  import java.text.SimpleDateFormat
  val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss") // For me departure values are in string, so using this to convert sql timestmap.
  val doSomething = udf((value: Seq[String]) => {
     value.map(x => s"dosomething${dateFormat.parse(x).getTime}")
  })

// Exiting paste mode, now interpreting.

import java.text.SimpleDateFormat
dateFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@41bd83a
doSomething: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated = df.select("flight.*").withColumn("legs",arrays_zip($"legs.arrival",doSomething($"legs.departure")).cast("array<struct<arrival:string,departure:string>>")).select(struct($"segments",$"legs").as("flight"))
updated.printSchema
updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+-------------------------------------------------------------------------------------------------+
|flight                                                                                           |
+-------------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, dosomething1604045100000]]]|
+-------------------------------------------------------------------------------------------------+

Time taken: 679 ms

scala>

Code Without UDF

scala> val df = spark.read.json(Seq("""{"flight": {"legs": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}],"segments": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}]}}""").toDS)
df: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<arrival:string,departure:string>>, segments: array<struct<arrival:string,departure:string>>>]

scala> df.printSchema
root
 |-- flight: struct (nullable = true)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)


scala> df.show(false)
+--------------------------------------------------------------------------------------------+
|flight                                                                                      |
+--------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+--------------------------------------------------------------------------------------------+


scala> :paste
// Entering paste mode (ctrl-D to finish)

spark.time {
val updated= df
            .select("flight.*")
            .select($"segments",$"legs.arrival",$"legs.departure") // extracting legs struct column values.
            .withColumn("departure",explode($"departure")) // exploding departure column
            .withColumn("departure",concat_ws("-",lit("something"),$"departure".cast("timestamp").cast("long"))) // updating departure column values
            .groupBy($"segments",$"arrival") // grouping columns except legs column
            .agg(collect_list($"departure").as("departure")) // constructing list back
            .select($"segments",arrays_zip($"arrival",$"departure").as("legs")) // construction arrival & departure columns using arrays_zip method.
            .select(struct($"legs",$"segments").as("flight")) // finally creating flight by combining legs & segments columns.

  updated.printSchema
  updated.show(false)
}

// Exiting paste mode, now interpreting.

root
 |-- flight: struct (nullable = false)
 |    |-- legs: array (nullable = true)
 |    |    |-- element: struct (containsNull = false)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)
 |    |-- segments: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- arrival: string (nullable = true)
 |    |    |    |-- departure: string (nullable = true)

+---------------------------------------------------------------------------------------------+
|flight                                                                                       |
+---------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, something-1604045100]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+---------------------------------------------------------------------------------------------+

Time taken: 1493 ms

scala>


推荐阅读