首页 > 解决方案 > 在 SparkStreaming 中暂停和恢复 KafkaConsumer

问题描述

:)

我已经在一个(奇怪的)情况下结束了自己,简单地说,我不想消耗来自 Kafka 的任何新记录,所以暂停主题中所有分区的 sparkStreaming 消耗(InputDStream[ConsumerRecord]),做一些操作和最后,恢复消费记录。

首先……这可能吗?

我一直在尝试这样的事情:

var consumer: KafkaConsumer[String, String] = _    
consumer = new KafkaConsumer[String, String](properties)    
consumer.subscribe(java.util.Arrays.asList(topicName))

consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())

但我得到了这个:

println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]

欢迎任何帮助了解我缺少什么以及为什么在很明显消费者分配了分区时我得到空结果的任何帮助!

版本:Kafka:0.10 Spark:2.3.0 Scala:2.11.8

标签: apache-kafkaspark-streaming

解决方案


是的,可以在代码中添加检查点并传递持久存储(本地磁盘、S3、HDFS)路径

并且每当您开始/恢复您的工作时,它都会从检查点获取带有消费者偏移量的 Kafka 消费者组信息,并从停止的位置开始处理。

val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

Spark Check-=pointing 是一种机制,不仅可以保存偏移量,还可以保存 Stages 和 Jobs 的 DAG 的序列化状态。所以每当你用新代码重新开始你的工作时,它会

  1. 读取并处理序列化数据
  2. 如果您的 Spark 应用程序中有任何代码更改,请清理缓存的 DAG 阶段
  3. 使用最新代码从新数据恢复处理。

现在从磁盘读取只是Spark 加载 Kafka 偏移量、DAG 和旧的不完整处理数据所需的一次性操作。

一旦完成,它将始终按默认或指定的检查点间隔将数据保存到磁盘。

Spark 流提供了指定 Kafka 组 ID 的选项,但 Spark 结构化流没有。


推荐阅读