首页 > 解决方案 > Spark 结构化流式处理 Kafka 微批量计数

问题描述

我正在使用 Spark 结构化流从 Kafka 主题中读取记录;我打算计算 Spark 中每个“微批次”中收到的记录数readstream

这是一个片段:

val kafka_df = sparkSession
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host:port")
  .option("subscribe", "test-count")
  .load()

我从文档中了解到 kafka_df 将在 astreamingQuery启动(接下来)时被懒惰地评估,并且在评估时,它拥有一个微批次。所以,我想先做一个主题,然后做一个应该工作groupBycount

像这样:

val counter = kafka_df
             .groupBy("topic")
             .count()

现在要评估所有这些,我们需要一个 streaminQuery,比如说,一个控制台接收器查询以在控制台上打印它。这就是我看到问题的地方。DataFrames上的流式aggregate查询,例如kafka_df仅适用于outputMode complete/update而不适用于append

这实际上意味着,streamingQuery 报告的计数是累积的。

像这样:

 val counter_json = counter.toJSON   //to jsonify 
 val count_query = counter_json
                   .writeStream.outputMode("update")
                   .format("console")
                   .start()          // kicks of lazy evaluation
                   .awaitTermination()  

在受控设置中,其中:
实际发布记录:1500
实际接收微批次:3
实际接收记录:1500

每个微批次的计数应该是500,所以我希望(希望)查询打印到控制台:

主题:测试
计数:500
主题:测试
计数:500
主题:测试
计数:500

但事实并非如此。它实际上打印:

主题:测试
计数:500
主题:测试
计数:1000
主题:测试
计数:1500

我理解这是因为'outputMode'完成/更新(累积)

我的问题:是否有可能准确地得到每个微批次的计数是 Spark-Kafka 结构化流式传输?

从文档中,我发现了水印方法(支持追加):

val windowedCounts = kafka_df
                    .withWatermark("timestamp", "10 seconds")
                    .groupBy(window($"timestamp", "10 seconds", "10       seconds"), $"topic")
                    .count()

 val console_query = windowedCounts
                    .writeStream
                    .outputMode("append")
                    .format("console")
                    .start()
                    .awaitTermination()

但是这样的结果console_query是不准确的,而且看起来很离谱。

TL;DR - 任何关于准确计算 Spark-Kafka 微批处理中记录的想法都将不胜感激。

标签: apache-sparkapache-kafkaspark-structured-streamingspark-streaming-kafka

解决方案


“TL;DR - 任何关于准确计算 Spark-Kafka 微批处理中记录的想法都将不胜感激。”

您可以使用StreamingQueryListener( ScalaDocs ) 计算从 Kafka 获取的记录。

这允许您打印出从订阅的 Kafka 主题接收到的确切行数。该onQueryProgressAPI 在每个微批处理期间都会被调用,并包含有关您的查询的大量有用元信息。如果没有数据流入查询,则每 10 秒调用一次 onQueryProgress。下面是一个打印输入消息数量的简单示例。

spark.streams.addListener(new StreamingQueryListener() {
    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {}

    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {}

    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
      println("NumInputRows: " + queryProgress.progress.numInputRows)
    }
  })

如果您正在验证结构化流查询的性能,通常最好关注以下两个指标:

  • queryProgress.progress.inputRowsPerSecond
  • queryProgress.progress.processedRowsPerSecond

如果输入高于处理,您可能会增加作业资源或减少最大限制(通过减少 readStream 选项 maxOffsetsPerTrigger)。如果处理的更高,您可能需要增加此限制。


推荐阅读