首页 > 解决方案 > Spark 结构化流式传输 groupBy 不在附加模式下工作(在更新中工作)

问题描述

我正在尝试通过在附加输出模式下工作来获得流聚合/组,以便能够在流到流连接中使用生成的流。我正在开发 (Py)Spark 2.3.2,并且正在使用 Kafka 主题。

我的伪代码如下所示,在 Zeppelin 笔记本中运行

orderStream = spark.readStream().format("kafka").option("startingOffsets", "earliest").....

orderGroupDF = (orderStream
    .withWatermark("LAST_MOD", "20 seconds")
    .groupBy("ID", window("LAST_MOD", "10 seconds", "5 seconds"))
    .agg(
        collect_list(struct("attra", "attrb2",...)).alias("orders"),
        count("ID").alias("number_of_orders"),
        sum("PLACED").alias("number_of_placed_orders"),
        min("LAST_MOD").alias("first_order_tsd")
    )
)

debug = (orderGroupDF.writeStream
  .outputMode("append")
  .format("memory").queryName("debug").start()
)

在那之后,我希望数据出现在debug查询中,我可以从中进行选择(在 20 秒的延迟到达窗口过期之后。但调试查询中没有每个数据出现(我等了几分钟)

当我将输出模式更改update 为查询时立即工作。

任何暗示我做错了什么?

编辑:经过更多的实验,我可以添加以下内容(但我仍然不明白)。

启动 Spark 应用程序时,我使用的主题有很多旧数据(事件时间戳 << 当前时间)。启动后,它似乎读取了所有这些消息(例如,日志中的 MicroBatchExecution 报告“numRowsTotal = 6224”),但输出没有产生任何内容,并且来自 MicroBatchExecution 的日志中的 eventTime 水印停留在纪元(1970-01- 01)。

在 eventTimestamp 非常接近当前时间的输入主题上生成一条新消息后,查询立即输出所有“排队”记录,并在查询中碰撞 eventTime 水印。

我还可以看到时区似乎存在问题。我的 Spark 程序在 CET 中运行(当前为 UTC+2)。传入 Kafka 消息中的时间戳采用 UTC 格式,例如"LAST__MOD": "2019-05-14 12:39:39.955595000". 我已经设置了spark_sess.conf.set("spark.sql.session.timeZone", "UTC")。尽管如此,在输入主题上产生了“新”消息之后的微批量报告说

"eventTime" : {
  "avg" : "2019-05-14T10:39:39.955Z",
  "max" : "2019-05-14T10:39:39.955Z",
  "min" : "2019-05-14T10:39:39.955Z",
  "watermark" : "2019-05-14T10:35:25.255Z"
},

所以 eventTime 以某种方式与输入消息中的时间联系起来,但它是 2 小时。UTC 差异已被减去两次。此外,我看不到水印计算是如何工作的。鉴于我将其设置为 20 秒,我预计它会比最大事件时间早 20 秒。但显然它比它大 4 分 14 秒。我看不出这背后的逻辑。

我很困惑...

标签: apache-spark-sqlspark-structured-streaming

解决方案


似乎这与我使用的 Spark 版本 2.3.2 有关,更具体而言可能与SPARK-24156 有关。我已经升级到 Spark 2.4.3,在这里我立即得到了 groupBy 的结果(当然,在水印 lateThreshold 过期之后,但是“在预期的时间范围内”。


推荐阅读