首页 > 解决方案 > Spark 结构化流:如何合并新数据和结果

问题描述

我正在使用spark处理流数据,我需要将新数据与处理后的数据合并,例如,我对数据进行了分组和排序:

val groups = data
.groupBy("room_ID").agg(sort_array(collect_list(struct($"room_date",$"readout"))).as("subRecord"))

团体:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|  id  | subRecord                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|  1   |[[2013-12-14 00:00:00, 10], [2013-12-14 01:00:00, 15], [2013-12-14 02:00:00, 20], [2013-12-14 03:00:00,  5], [2013-12-14 04:00:00, 30], [2013-12-14 05:00:00, 35]]|
|  2   |[[2013-12-14 00:00:00, 30], [2013-12-14 01:00:00, 35], [2013-12-14 02:00:00, 45], [2013-12-14 03:00:00, 55], [2013-12-14 04:00:00, 65], [2013-12-14 05:00:00, 70]]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

我需要删除[2013-12-14 03:00:00, 5]id:1 的记录,这意味着我必须将每个点与周围的点进行比较,所以我使用 udf 来执行此操作

val clearNoise:UserDefinedFunction = udf(
//do clear noise
)
val result = groups.withColumn("cleard",clear($"subRecord"))

结果:

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|  id  | cleard                                                                                                                                                        |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|  1   |[[2013-12-14 00:00:00, 10], [2013-12-14 01:00:00, 15], [2013-12-14 02:00:00, 20], [2013-12-14 04:00:00, 30], [2013-12-14 05:00:00, 35]]|
|  2   |[[2013-12-14 00:00:00, 30], [2013-12-14 01:00:00, 35], [2013-12-14 02:00:00, 45], [2013-12-14 03:00:00, 55], [2013-12-14 04:00:00, 65], [2013-12-14 05:00:00, 70]]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

我的问题是,当新数据到来时,spark 会再次处理所有数据,如果是这样,我怎样才能合并新数据和结果中每个 id 的最新 2 点而不再次处理旧数据?换句话说,最后一批的输出是下一批的输入。提前谢谢。

标签: apache-sparkmergespark-structured-streaming

解决方案


我假设您将为新数据创建一个新文件。如果两个 RDD 具有相同的结构,您可以将现有 RDD 与您将使用新文件创建的新 RDD 连接起来。

val MergedRDD = result.union(newRDD)

这会将两个 RDD 的结果合并为一个,而无需再次处理旧数据。

但是,如果您的传入数据是流式传输的,则可以更好地使用DStream

当您有流数据进入时,它是最好的。


推荐阅读