scala - Spark Structured Streaming - 最近 X 小时数据的实时聚合
问题描述
我是 spark 新手,我正在使用 spark 结构化流以 scala 语言从 Kafka 读取数据流。
我想使用 Apache Spark 聚合最后 X 小时的数据,并且(如果可能)只将更新写入目标,
所以假设我想要ID1
过去 1 小时内客户的最低价格,所以如果我有以下事件:
Events Data:
+------------------------------------------+-------------------------+---------+
|event_time |customer |price |
+------------------------------------------+-------------------------+---------+
| 2021-03-09 11:00:00 |ID1 |2000 |
| 2021-03-09 11:28:00 |ID1 |1500 |
| 2021-03-09 15:20:00 |ID1 |2500 |
+------------------------------------------+-------------------------+---------+
at 2021-03-09 11:00:00 desired output (data between 10:00:00 and 11:00:00) :
+-------------------------+------------+
|customer |min_price |
+-------------------------+------------+
|ID1 |2000 |
+-------------------------+------------+
at 2021-03-09 11:28:00 desired output (data between 10:28:00 and 11:28:00):
+-------------------------+------------+
|customer |min_price |
+-------------------------+------------+
|ID1 |1500 |
+-------------------------+------------+
at 2021-03-09 15:20:00 desired output (data between 14:20:00 and 15:20:00):
+-------------------------+------------+
|customer |min_price |
+-------------------------+------------+
|ID1 |2500 |
+-------------------------+------------+
相反,Kafka 一直在输出 1500 我试图将输入流过滤到最后 1 小时 我尝试使用滑动窗口,但我得到的窗口太多,我只需要最后一个以最后一个 event_time 结尾的窗口。
val df = spark.readStream
.format("kafka").option("kafka.bootstrap.servers", brokers)
.option("subscribe", topics)
.option("startingOffsets", "latest").load()
val ds1 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)").as[(String, String)]
// some more transformation ds1 => data
// this is filtering try
filteredData = data.filter($"event_time" > (current_timestamp() - expr("INTERVAL 1 hours")))
results = filteredData.groupBy($"customer").agg(min("price").alias("min_price"))
//this is sliding window
results = filteredData.groupBy(window($"event_time", "1 hours", "5 minutes"),$"customer")
.agg(min("price").alias("min_price"))
出于测试目的,我正在写信给控制台
这在火花结构化流中可行吗?
解决方案
推荐阅读
- matlab - 从单元格中查找独特的月-年组合
- image - ASP.NET Core:从外部站点获取图像并保存在服务器上
- sap-cloud-sdk - SAP Cloud SDK Spring 安全 JARS
- grafana - Graphite / Grafana:如何在查询中修改指标的分辨率?
- sql-server - Laravel如何创建流畅的查询生成器加入子查询
- android - Dialogflow 发送用户数据
- reactjs - React TypeScript:功能组件中 UseRef 的替代方案
- python - Python CSV Reader 无法正确打印行
- powershell - 使用 Powershell 添加 Active Directory 组
- sql - 将标头添加到 SQL (Sybase) 输出