首页 > 解决方案 > 跟踪 Spark 结构化流中的消费消息

问题描述

我想设置配置,让我的应用程序跟踪来自 kafka 的消费消息。因此,每当它失败时,它可以从最后一次提交或消耗的偏移量开始选择。

readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "topic1")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .trigger(Trigger.Continuous("1 second"))  // only change in query
  .start();

我在网上读到checkpointlocation可以设置属性,火花可以使用它来跟踪偏移量。

想知道我可以在哪里设置这个属性?我可以在上面的代码中设置option吗?我可以知道如何正确设置它。

其次,我无法理解trigger(Trigger.Continuous("1 second"))财产。Docs 说continuous processing engine will record the progress of the query every second,它在阅读来自 kafka 的消息时记录了什么样的进度?

标签: javaapache-sparkapache-kafkaspark-structured-streaming

解决方案


您可以将检查点位置设置为您的选项writeStream

[...]
.writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "topic1")
  .option("checkpointLocation", "/path/to/dir")
  .trigger(Trigger.Continuous("1 second"))
  .start();

从 Kafka 读取时跟踪进度意味着跟踪 TopicPartition 中消耗的偏移量。设置检查点位置将使您的应用程序能够将该信息作为 JSON 对象存储在给定路径中,例如

{
  "topic1":{
    "0":11, 
    "1":101
  }
}

这意味着应用程序已经消耗了 partition 中的 offset 10和topic0的 partition 中的 offset 100 。检查点是“提前”写入的(使用 write-ahead-logs),因此应用程序将继续从 Kafka 读取消息,该消息在预期或意外(失败)重新启动之前中断。1topic1

Trigger.ContinuousSpark 版本 2.3 开始可用。并且现在标记为实验性的。与微批处理方法相比,它会在 Kafka 中的每条消息到达主题后立即获取它,而无需尝试将其与其他消息进行批处理。这可以改善延迟,但很可能会降低您的整体吞吐量。

参数(例如1 seconds)确定检查点的频率。

使用此触发模式时,重要的是至少要有与主题分区一样多的可用内核。否则,申请将不会有任何进展。你可以在这里阅读更多关于它的信息:

“例如,如果您正在读取具有 10 个分区的 Kafka 主题,那么集群必须至少有 10 个内核才能使查询取得进展。”


推荐阅读