apache-spark-sql - Spark 结构化流式传输 groupBy 不在附加模式下工作(在更新中工作)
问题描述
我正在尝试通过在附加输出模式下工作来获得流聚合/组,以便能够在流到流连接中使用生成的流。我正在开发 (Py)Spark 2.3.2,并且正在使用 Kafka 主题。
我的伪代码如下所示,在 Zeppelin 笔记本中运行
orderStream = spark.readStream().format("kafka").option("startingOffsets", "earliest").....
orderGroupDF = (orderStream
.withWatermark("LAST_MOD", "20 seconds")
.groupBy("ID", window("LAST_MOD", "10 seconds", "5 seconds"))
.agg(
collect_list(struct("attra", "attrb2",...)).alias("orders"),
count("ID").alias("number_of_orders"),
sum("PLACED").alias("number_of_placed_orders"),
min("LAST_MOD").alias("first_order_tsd")
)
)
debug = (orderGroupDF.writeStream
.outputMode("append")
.format("memory").queryName("debug").start()
)
在那之后,我希望数据出现在debug
查询中,我可以从中进行选择(在 20 秒的延迟到达窗口过期之后。但调试查询中没有每个数据出现(我等了几分钟)
当我将输出模式更改update
为查询时立即工作。
任何暗示我做错了什么?
编辑:经过更多的实验,我可以添加以下内容(但我仍然不明白)。
启动 Spark 应用程序时,我使用的主题有很多旧数据(事件时间戳 << 当前时间)。启动后,它似乎读取了所有这些消息(例如,日志中的 MicroBatchExecution 报告“numRowsTotal = 6224”),但输出没有产生任何内容,并且来自 MicroBatchExecution 的日志中的 eventTime 水印停留在纪元(1970-01- 01)。
在 eventTimestamp 非常接近当前时间的输入主题上生成一条新消息后,查询立即输出所有“排队”记录,并在查询中碰撞 eventTime 水印。
我还可以看到时区似乎存在问题。我的 Spark 程序在 CET 中运行(当前为 UTC+2)。传入 Kafka 消息中的时间戳采用 UTC 格式,例如"LAST__MOD": "2019-05-14 12:39:39.955595000"
. 我已经设置了spark_sess.conf.set("spark.sql.session.timeZone", "UTC")
。尽管如此,在输入主题上产生了“新”消息之后的微批量报告说
"eventTime" : {
"avg" : "2019-05-14T10:39:39.955Z",
"max" : "2019-05-14T10:39:39.955Z",
"min" : "2019-05-14T10:39:39.955Z",
"watermark" : "2019-05-14T10:35:25.255Z"
},
所以 eventTime 以某种方式与输入消息中的时间联系起来,但它是 2 小时。UTC 差异已被减去两次。此外,我看不到水印计算是如何工作的。鉴于我将其设置为 20 秒,我预计它会比最大事件时间早 20 秒。但显然它比它大 4 分 14 秒。我看不出这背后的逻辑。
我很困惑...
解决方案
似乎这与我使用的 Spark 版本 2.3.2 有关,更具体而言可能与SPARK-24156 有关。我已经升级到 Spark 2.4.3,在这里我立即得到了 groupBy 的结果(当然,在水印 lateThreshold 过期之后,但是“在预期的时间范围内”。
推荐阅读
- azure-webapps - 如何监控 Azure WebApp 是否响应?
- javascript - 在javascript中计算范围对象的值
- salesforce - 如何通过 Salesforce 的 RestAPi 连接和检索雪花(数据库)
- python-3.x - 使用 python 将 azure Runbook 与 azure 中的文件共享连接起来
- excel - 无法在 Excel VBA 中使用 InputBox 进行过滤
- c++ - 如何在 ROS 中获取 hector_quadrotor 的 z 位置?
- typescript - 检查对象是否至少具有指定的属性
- java - 不兼容的类型:JSONLoader 无法转换为 Loader
- cmake - 使用 cmake 构建 Fortran 共享库的奇怪问题
- java - 从服务器上部署的 Spring Boot 应用程序中读取 API 元数据