apache-kafka - 在 SparkStreaming 中暂停和恢复 KafkaConsumer
问题描述
:)
我已经在一个(奇怪的)情况下结束了自己,简单地说,我不想消耗来自 Kafka 的任何新记录,所以暂停主题中所有分区的 sparkStreaming 消耗(InputDStream[ConsumerRecord]),做一些操作和最后,恢复消费记录。
首先……这可能吗?
我一直在尝试这样的事情:
var consumer: KafkaConsumer[String, String] = _
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(java.util.Arrays.asList(topicName))
consumer.pause(consumer.assignment())
...
consumer.resume(consumer.assignment())
但我得到了这个:
println(s"Assigned partitions: $consumer.assignment()") --> []
println(s"Paused partitions: ${consumer.paused()}") --> []
println(s"Partitions for: ${consumer.partitionsFor(topicNAme)}") --> [Partition(topic=topicAAA, partition=0, leader=1, replicas=[1,2,3], partition=1, ... ]
欢迎任何帮助了解我缺少什么以及为什么在很明显消费者分配了分区时我得到空结果的任何帮助!
版本:Kafka:0.10 Spark:2.3.0 Scala:2.11.8
解决方案
是的,可以在代码中添加检查点并传递持久存储(本地磁盘、S3、HDFS)路径
并且每当您开始/恢复您的工作时,它都会从检查点获取带有消费者偏移量的 Kafka 消费者组信息,并从停止的位置开始处理。
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
Spark Check-=pointing 是一种机制,不仅可以保存偏移量,还可以保存 Stages 和 Jobs 的 DAG 的序列化状态。所以每当你用新代码重新开始你的工作时,它会
- 读取并处理序列化数据
- 如果您的 Spark 应用程序中有任何代码更改,请清理缓存的 DAG 阶段
- 使用最新代码从新数据恢复处理。
现在从磁盘读取只是Spark 加载 Kafka 偏移量、DAG 和旧的不完整处理数据所需的一次性操作。
一旦完成,它将始终按默认或指定的检查点间隔将数据保存到磁盘。
Spark 流提供了指定 Kafka 组 ID 的选项,但 Spark 结构化流没有。
推荐阅读
- python - 生成给定日期之间的季度列表
- python - 如何使用 Scrapy 进行 URL 抓取
- android-source - 当我在几个小时后运行 CTS 时,与设备的 adb 连接变得无响应
- javascript - 如何使用代理
使用与 T 不同的类型作为参数? - ios - 将数据从 MainView 传递到 ContainerView
- javascript - 如何使旧版 Firefox 支持 Angular 5 应用程序?
- ansible - Ansible 在运行用于创建符号链接的 shell 命令时卡住了
- python - 为什么 getAttribute() 没有给出硒的结果?
- ssl - 为在另一台服务器上注册域的站点安装 SSL 证书
- jquery - 仅获取带有自定义索引的选择