首页 > 解决方案 > 没有流时的Spark结构化流窗口

问题描述

我想记录从 Spark 结构化流的传入流中读取到数据库的记录数。我正在使用 foreachbatch 转换传入的流批处理并写入所需的位置。如果在特定小时内没有记录,我想记录读取的 0 条记录。但是当没有流时,foreach 批处理不会执行。有人可以帮我吗?我的代码如下:

val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()

val query=incomingStream.writeStream.foreachBatch{
  (batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
              .option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
              .trigger(Trigger.ProcessingTime(triggerTime.toLong))
              .start().awaitTermination()

标签: apache-sparkspark-streamingspark-structured-streamingazure-databricks

解决方案


这就是它的工作方式,甚至只有当有一些东西要处理并因此流的状态发生变化时才会调用 StreamingQueryListener 的扩展。

可能还有另一种方法,但我会说“跳出框框思考”并在每个时间范围内预先填充 0,这样的数据库和查询 AGGRegate 时,您将得到正确的答案。

https://medium.com/@johankok/structured-streaming-in-a-flash-576cdb17bbee 可以提供一些见解以及 Spark:权威指南。


推荐阅读