mongodb - 将函数应用于 Spark DataFrame 结构内的列,替换该列
问题描述
我找不到我正在寻找的确切内容,所以这是我的问题。我从 MongoDb 获取一些数据到 Spark Dataframe 中。数据框具有以下架构 ( df.printSchema
):
|-- flight: struct (nullable = true)
| |-- legs: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: timestamp (nullable = true)
| | | |-- departure: timestamp (nullable = true)
| |-- segments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: timestamp (nullable = true)
| | | |-- departure: timestamp (nullable = true)
请注意顶层结构,后跟一个数组,我需要在其中更改我的数据。例如:
{
"flight": {
"legs": [{
"departure": ISODate("2020-10-30T13:35:00.000Z"),
"arrival": ISODate("2020-10-30T14:47:00.000Z")
}
],
"segments": [{
"departure": ISODate("2020-10-30T13:35:00.000Z"),
"arrival": ISODate("2020-10-30T14:47:00.000Z")
}
]
}
}
我想在 Json 中导出它,但出于某种商业原因,我希望到达日期的格式与出发日期的格式不同。例如,我可能想从纪元导出以毫秒为单位的出发 ISODate,而不是到达的。
为此,我想到了应用自定义函数来进行转换:
// Here I can do any tranformation. I hope to replace the timestamp with the needed value
val doSomething: UserDefinedFunction = udf( (value: Seq[Timestamp]) => {
value.map(x => "doSomething" + x.getTime) }
)
val newDf = df.withColumn("flight.legs.departure",
doSomething(df.col("flight.legs.departure")))
但这只是返回一个全新的列,其中包含单个doSomething
字符串的数组。
{
"flight": {
"legs": [{
"arrival": "2020-10-30T14:47:00Z",
"departure": "2020-10-30T13:35:00Z"
}
],
"segments": [{
"arrival": "2020-10-30T14:47:00Z",
"departure": "2020-10-30T13:35:00Z",
}
]
},
"flight.legs.departure": ["doSomething1596268800000"]
}
和newDf.show(1)
+--------------------+---------------------+
| flight|flight.legs.departure|
+--------------------+---------------------+
|[[[182], 94, [202...| [doSomething15962...|
+--------------------+---------------------+
代替
{
...
"arrival": "2020-10-30T14:47:00Z",
//leg departure date that I changed
"departure": "doSomething1596268800000"
... // segments not affected in this example
"arrival": "2020-10-30T14:47:00Z",
"departure": "2020-10-30T13:35:00Z",
...
}
任何想法如何进行?
编辑 - 澄清:请记住,我的架构比上面显示的要复杂得多。例如,还有另一个顶级data
标签,flight
下面还有其他信息。然后在里面flight
,legs
还有segments
更多的元素,有些也是嵌套的。我只专注于我需要改变的那些。
我这么说是因为我想要最简单的可以扩展的解决方案。即理想情况下,只需更改所需元素而无需解构并重新构建整个嵌套结构。如果我们无法避免这种情况,那么使用案例类是最简单的解决方案吗?
解决方案
请检查下面的代码。
执行时间处理时间
With UDF
:所用时间:679 毫秒
Without UDF
:所用时间:1493 毫秒
Code With UDF
scala> :paste
// Entering paste mode (ctrl-D to finish)
// Creating UDF to update value inside array.
import java.text.SimpleDateFormat
val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'hh:mm:ss") // For me departure values are in string, so using this to convert sql timestmap.
val doSomething = udf((value: Seq[String]) => {
value.map(x => s"dosomething${dateFormat.parse(x).getTime}")
})
// Exiting paste mode, now interpreting.
import java.text.SimpleDateFormat
dateFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@41bd83a
doSomething: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,ArrayType(StringType,true),Some(List(ArrayType(StringType,true))))
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.time {
val updated = df.select("flight.*").withColumn("legs",arrays_zip($"legs.arrival",doSomething($"legs.departure")).cast("array<struct<arrival:string,departure:string>>")).select(struct($"segments",$"legs").as("flight"))
updated.printSchema
updated.show(false)
}
// Exiting paste mode, now interpreting.
root
|-- flight: struct (nullable = false)
| |-- segments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
| |-- legs: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
+-------------------------------------------------------------------------------------------------+
|flight |
+-------------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, dosomething1604045100000]]]|
+-------------------------------------------------------------------------------------------------+
Time taken: 679 ms
scala>
Code Without UDF
scala> val df = spark.read.json(Seq("""{"flight": {"legs": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}],"segments": [{"departure": "2020-10-30T13:35:00","arrival": "2020-10-30T14:47:00"}]}}""").toDS)
df: org.apache.spark.sql.DataFrame = [flight: struct<legs: array<struct<arrival:string,departure:string>>, segments: array<struct<arrival:string,departure:string>>>]
scala> df.printSchema
root
|-- flight: struct (nullable = true)
| |-- legs: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
| |-- segments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
scala> df.show(false)
+--------------------------------------------------------------------------------------------+
|flight |
+--------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, 2020-10-30T13:35:00]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+--------------------------------------------------------------------------------------------+
scala> :paste
// Entering paste mode (ctrl-D to finish)
spark.time {
val updated= df
.select("flight.*")
.select($"segments",$"legs.arrival",$"legs.departure") // extracting legs struct column values.
.withColumn("departure",explode($"departure")) // exploding departure column
.withColumn("departure",concat_ws("-",lit("something"),$"departure".cast("timestamp").cast("long"))) // updating departure column values
.groupBy($"segments",$"arrival") // grouping columns except legs column
.agg(collect_list($"departure").as("departure")) // constructing list back
.select($"segments",arrays_zip($"arrival",$"departure").as("legs")) // construction arrival & departure columns using arrays_zip method.
.select(struct($"legs",$"segments").as("flight")) // finally creating flight by combining legs & segments columns.
updated.printSchema
updated.show(false)
}
// Exiting paste mode, now interpreting.
root
|-- flight: struct (nullable = false)
| |-- legs: array (nullable = true)
| | |-- element: struct (containsNull = false)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
| |-- segments: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- arrival: string (nullable = true)
| | | |-- departure: string (nullable = true)
+---------------------------------------------------------------------------------------------+
|flight |
+---------------------------------------------------------------------------------------------+
|[[[2020-10-30T14:47:00, something-1604045100]], [[2020-10-30T14:47:00, 2020-10-30T13:35:00]]]|
+---------------------------------------------------------------------------------------------+
Time taken: 1493 ms
scala>
推荐阅读
- c# - C# WPF 应用程序发布后不写入文件
- sql - 如果后续行具有相同的 ID,则标记“是/否”
- java - 是否可以使用 AIDL 发送 Uri?
- node.js - Postman 无法连接 NodeJS 服务器,即使它正在运行?
- jupyter-notebook - 重建 Jupyter notebook nbsignatures.db 信任数据库?
- database - PLS-00201:必须在 owbsys 用户上声明标识符“DBMS_AWM”
- reactjs - React Function 组件使用局部变量设置状态变量 V/s
- flutter - Flutter 实现自定义 BottomSheet
- android - onPerformSync 的捕获状态结束他的工作
- amazon-web-services - AWS Sagemaker 监控错误:编码不匹配