scala - Kafka 消费者在尝试使用 Spark 处理消息时多次消费消息
问题描述
我有一个 Kafka 消费者,它从一个主题读取消息并使用 spark 将其写入配置单元表。当我在 Yarn 上运行代码时,它会多次读取相同的消息。我在该主题中有大约 100,000 条消息。但是,我的消费者继续多次阅读相同的内容。当我做一个不同的时候,我得到了实际的计数。
这是我编写的代码。我想知道我是否缺少任何设置。
val spark = SparkSession.builder()
.appName("Kafka Consumer")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val kafkaConsumerProperty = new Properties()
kafkaConsumerProperty.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "---")
kafkaConsumerProperty.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.GROUP_ID_CONFIG, "draw_attributes")
kafkaConsumerProperty.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
kafkaConsumerProperty.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val topic = "space_orchestrator"
val kafkaConsumer = new KafkaConsumer[String,String](kafkaConsumerProperty)
kafkaConsumer.subscribe(Collections.singletonList(topic))
while(true){
val recordSeq = kafkaConsumer.poll(10000).toSeq.map( x => x.value())
if(!recordSeq.isEmpty)
{
val newDf = spark.read.json(recordSeq.toDS)
newDf.write.mode(SaveMode.Overwrite).saveAsTable("dmart_dev.draw_attributes")
}
}
解决方案
作为替代方案,请尝试手动设置偏移量。为此,应禁用自动提交 ( enable.auto.commit = false
)。对于手动提交,KafkaConsumers 提供了两种方法,即commitSync()
和commitAsync()
。顾名思义,commitSync() 是一个阻塞调用,它会在成功提交偏移后返回,而 commitAsync() 会立即返回。
推荐阅读
- data-science - 主题建模评测:如何理解一个coherence value / c_v为0.4,是好是坏?
- java - 有没有办法从使用 StageStyle.UTILITY 的 JavaFX UI 中删除标题栏
- javascript - 如何在发送响应之前等待进程终止
- php - 如何修复反向代理服务器后面的wordpress站点的url
- c++ - 如何使用相同的函数 C++ 实例化多个线程
- ios - 从锁定屏幕接听电话后显示自定义 UI
- printing - Python:我的初始测试出现了奇怪的问题
- ios - 我似乎无法让我的 keyboardWillShow 工作?
- bootstrap-modal - 在打印模式下重复模态内容
- c - 如何在 Windows 7 上记录网络带宽?