apache-spark - 没有流时的Spark结构化流窗口
问题描述
我想记录从 Spark 结构化流的传入流中读取到数据库的记录数。我正在使用 foreachbatch 转换传入的流批处理并写入所需的位置。如果在特定小时内没有记录,我想记录读取的 0 条记录。但是当没有流时,foreach 批处理不会执行。有人可以帮我吗?我的代码如下:
val incomingStream = spark.readStream.format("eventhubs").options(customEventhubParameters.toMap).load()
val query=incomingStream.writeStream.foreachBatch{
(batchDF: DataFrame, batchId: Long)=> writeStreamToDataLake(batchDF,batchId,partitionColumn,fileLocation,errorFilePath,eventHubName,configMeta)
}
.option("checkpointLocation",fileLocation+checkpointFolder+"/"+eventHubName)
.trigger(Trigger.ProcessingTime(triggerTime.toLong))
.start().awaitTermination()
解决方案
这就是它的工作方式,甚至只有当有一些东西要处理并因此流的状态发生变化时才会调用 StreamingQueryListener 的扩展。
可能还有另一种方法,但我会说“跳出框框思考”并在每个时间范围内预先填充 0,这样的数据库和查询 AGGRegate 时,您将得到正确的答案。
https://medium.com/@johankok/structured-streaming-in-a-flash-576cdb17bbee 可以提供一些见解以及 Spark:权威指南。
推荐阅读
- c# - Audit.Net MVC - 如何解决 NullReferenceException?
- reactjs - 为什么在输入更改时重新渲染功能组件中的所有元素
- javascript - moment.utc('date string').format('DD-MMM-YYYY') 返回上一个日期
- c# - 在不同的机器 C# 中生成和签署证书
- jquery - JQuery .nextUntil() 选择标题进入 li
- javascript - 如何创建多个popper实例?
- r - 如果 X 中的列为 FALSE,则删除 Y 中的相同列?
- r - 错误:`position` 必须是字符串或位置对象,而不是具有类标签的 S3 对象
- ios - registerUserNotificationSettings 没有触发代表并且推送通知无法正常工作我的项目
- java - 垃圾收集如何识别孤立对象?