apache-spark - Spark 结构化流:如何合并新数据和结果
问题描述
我正在使用spark处理流数据,我需要将新数据与处理后的数据合并,例如,我对数据进行了分组和排序:
val groups = data
.groupBy("room_ID").agg(sort_array(collect_list(struct($"room_date",$"readout"))).as("subRecord"))
团体:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | subRecord |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1 |[[2013-12-14 00:00:00, 10], [2013-12-14 01:00:00, 15], [2013-12-14 02:00:00, 20], [2013-12-14 03:00:00, 5], [2013-12-14 04:00:00, 30], [2013-12-14 05:00:00, 35]]|
| 2 |[[2013-12-14 00:00:00, 30], [2013-12-14 01:00:00, 35], [2013-12-14 02:00:00, 45], [2013-12-14 03:00:00, 55], [2013-12-14 04:00:00, 65], [2013-12-14 05:00:00, 70]]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我需要删除[2013-12-14 03:00:00, 5]
id:1 的记录,这意味着我必须将每个点与周围的点进行比较,所以我使用 udf 来执行此操作
val clearNoise:UserDefinedFunction = udf(
//do clear noise
)
val result = groups.withColumn("cleard",clear($"subRecord"))
结果:
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| id | cleard |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| 1 |[[2013-12-14 00:00:00, 10], [2013-12-14 01:00:00, 15], [2013-12-14 02:00:00, 20], [2013-12-14 04:00:00, 30], [2013-12-14 05:00:00, 35]]|
| 2 |[[2013-12-14 00:00:00, 30], [2013-12-14 01:00:00, 35], [2013-12-14 02:00:00, 45], [2013-12-14 03:00:00, 55], [2013-12-14 04:00:00, 65], [2013-12-14 05:00:00, 70]]|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
我的问题是,当新数据到来时,spark 会再次处理所有数据,如果是这样,我怎样才能合并新数据和结果中每个 id 的最新 2 点而不再次处理旧数据?换句话说,最后一批的输出是下一批的输入。提前谢谢。
解决方案
我假设您将为新数据创建一个新文件。如果两个 RDD 具有相同的结构,您可以将现有 RDD 与您将使用新文件创建的新 RDD 连接起来。
val MergedRDD = result.union(newRDD)
这会将两个 RDD 的结果合并为一个,而无需再次处理旧数据。
但是,如果您的传入数据是流式传输的,则可以更好地使用DStream
当您有流数据进入时,它是最好的。
推荐阅读
- python - LLC Compiler 未被识别为 cmdlet、函数、脚本文件或可运行程序的名称
- java - 十进制格式说“字符串不能转换为双精度”
- c++ - C++ Com 串口:写入失败
- websocket - Beyondcode laravel websockets周期性广播事件失败
- azure-cosmosdb - 如何确定哪些服务正在对 CosmosDB 数据库执行查询?
- php - 如何点击 Goutte 中的空白区域?
- caching - 页表和缓存
- sql - 使用 CASE 比较 DISTINCT(COUNT()) 和 COUNT() 列
- markdown - 在 Pandoc 中自定义 GFM 降价输出
- melt - 融化数据框中的多个列(变量)