首页 > 解决方案 > 如何在不分组的情况下每 5 分钟获取最近 1 小时的数据?

问题描述

如何每 5 分钟触发一次并获取最近 1 小时的数据?我想出了这个,但它似乎并没有在最后 1 小时内给我所有的行。我的理由是:

  1. 阅读流,

  2. 根据时间戳列过滤过去 1 小时的数据,以及

  3. 使用forEachbatch. 和

  4. 给它加水印,这样它就不会保留所有过去的数据。

     spark.
     readStream.format("delta").table("xxx")
       .withWatermark("ts", "60 minutes")
       .filter($"ts" > current_timestamp - expr("INTERVAL 60 minutes"))
     .writeStream
       .format("console")
       .trigger(Trigger.ProcessingTime("5 minutes"))
       .foreachBatch{ (batchDF: DataFrame, batchId: Long) =>  batchDF.collect().foreach(println)
            }
     .start()
    

还是我必须使用窗口?GroupBy但是如果我使用 Window 并且我不想分组,我似乎无法摆脱。

spark.
  readStream.format("delta").table("xxx")
    .withWatermark("ts", "1 hour")
    .groupBy(window($"ts", "1 hour"))
    .count()
 .writeStream
    .format("console")
    .trigger(Trigger.ProcessingTime("5 minutes"))
    .foreachBatch{ (batchDF: DataFrame, batchId: Long) => 
         print("...entering foreachBatch...\n")
         batchDF.collect().foreach(println)
         }
 .start()

标签: apache-spark-sqlspark-streamingspark-structured-streamingdelta-lake

解决方案


如果您想在代码中安排处理,您应该使用外部调度程序(cron 等)或 API java.util.Timer ,而不是使用 spark 流每 5 分钟执行一次 spark 代码

为什么不应该使用 spark-streaming 来安排 spark 代码执行

如果你使用 spark-streaming 来调度代码,你会遇到两个问题。

第一个问题,spark-streaming 只处理一次数据。所以每 5 分钟,只加载新记录。您可以考虑使用窗口函数绕过此问题,并使用collect_list用户定义的聚合函数检索行的聚合列表,但随后您将遇到第二个问题。

第二个问题,虽然你的处理会每5分钟触发一次,但foreachBatch只有当有新的记录要处理时才会执行里面的函数。在两次执行之间的 5 分钟间隔内没有新记录,则不会发生任何事情。

总之,Spark Streaming 并非旨在安排 Spark 代码以特定时间间隔执行。

使用 java.util.Timer 的解决方案

因此,您应该使用调度程序,而不是使用火花流,或者使用外部调度程序,例如cronoozieairflow等......或者在您的代码中

如果您需要在代码中执行此操作,可以使用java.util.Timer,如下所示:

import org.apache.spark.sql.functions.{current_timestamp, expr}
import spark.implicits._

val t = new java.util.Timer()
val task = new java.util.TimerTask {
  def run(): Unit = {
    spark.read.format("delta").table("xxx")
      .filter($"ts" > (current_timestamp() - expr("INTERVAL 60 minutes")))
      .collect()
      .foreach(println)
  }
}
t.schedule(task, 5*60*1000L, 5*60*1000L) // 5 minutes
task.run()

推荐阅读