apache-spark-sql - 如何在不分组的情况下每 5 分钟获取最近 1 小时的数据?
问题描述
如何每 5 分钟触发一次并获取最近 1 小时的数据?我想出了这个,但它似乎并没有在最后 1 小时内给我所有的行。我的理由是:
阅读流,
根据时间戳列过滤过去 1 小时的数据,以及
使用
forEachbatch
. 和给它加水印,这样它就不会保留所有过去的数据。
spark. readStream.format("delta").table("xxx") .withWatermark("ts", "60 minutes") .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes")) .writeStream .format("console") .trigger(Trigger.ProcessingTime("5 minutes")) .foreachBatch{ (batchDF: DataFrame, batchId: Long) => batchDF.collect().foreach(println) } .start()
还是我必须使用窗口?GroupBy
但是如果我使用 Window 并且我不想分组,我似乎无法摆脱。
spark.
readStream.format("delta").table("xxx")
.withWatermark("ts", "1 hour")
.groupBy(window($"ts", "1 hour"))
.count()
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("5 minutes"))
.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
print("...entering foreachBatch...\n")
batchDF.collect().foreach(println)
}
.start()
解决方案
如果您想在代码中安排处理,您应该使用外部调度程序(cron 等)或 API java.util.Timer ,而不是使用 spark 流每 5 分钟执行一次 spark 代码
为什么不应该使用 spark-streaming 来安排 spark 代码执行
如果你使用 spark-streaming 来调度代码,你会遇到两个问题。
第一个问题,spark-streaming 只处理一次数据。所以每 5 分钟,只加载新记录。您可以考虑使用窗口函数绕过此问题,并使用collect_list或用户定义的聚合函数检索行的聚合列表,但随后您将遇到第二个问题。
第二个问题,虽然你的处理会每5分钟触发一次,但foreachBatch
只有当有新的记录要处理时才会执行里面的函数。在两次执行之间的 5 分钟间隔内没有新记录,则不会发生任何事情。
总之,Spark Streaming 并非旨在安排 Spark 代码以特定时间间隔执行。
使用 java.util.Timer 的解决方案
因此,您应该使用调度程序,而不是使用火花流,或者使用外部调度程序,例如cron、oozie、airflow等......或者在您的代码中
如果您需要在代码中执行此操作,可以使用java.util.Timer,如下所示:
import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._
val t = new java.util.Timer()
val task = new java.util.TimerTask {
def run(): Unit = {
spark.read.format("delta").table("xxx")
.filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
.collect()
.foreach(println)
}
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()
推荐阅读
- json - 在 Flutter 应用中,如何一次加载 JSON 数据,并在应用中随处使用?
- odata - OpenAPI - 如何记录应该属于“函数”类型的参数,如 OData 的 $apply
- drools - 在 Drools 中调用 removeRule 时的 NPE
- sorting - Google 表格 QUERY 和 FILTER SORT
- docker - Docker 将配置传递给 yaml 文件
- spring - Spring Mongo 使用多个集合执行分页/排序
- tensorflow - 使用 Colab GPU 和我的 Mac 进行 tensorflow 计算
- cron - NestJS中同时执行cron
- html - 关于位置的 HTML/CSS:相对
- celery - 如何获取芹菜队列的消费者数量