java - Java Kafka Consumer 可以与新的 Consumer Group ID 一起正常工作,但是在关闭并重新启动时它不会消耗任何消息
问题描述
这可能是我缺乏理解,但令人沮丧。我有一个 Java Kafka 消费者,当它被分配一个新创建的 groupId 和 consumerID 时,它会很好地使用消息。但是,当在我的 Eclipse IDE 中停止 java 应用程序并使用相同的 groupID 和 consumerID 重新启动它时,它不会提取任何消息。如果我再次关闭应用程序,并为其分配新的和不同的 groupID/consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样?
下面的配置值
props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092");
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", 1000);
props.put("linger.ms", 1);
props.put("buffer.memory", 335544323);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "300000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
解决方案
默认情况下,当消费者开始阅读消息时,它会提交已成功消费的消息的偏移量,以便不再消费它们。
每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为之前没有为新创建的消费者组提交任何偏移量)。
消费者从协调者收到分配后,必须确定每个分配分区的初始位置。首次创建组时,在消费任何消息之前,根据可配置的偏移重置策略 (
auto.offset.reset
) 设置位置。通常,消费从最早的偏移量或最新的偏移量开始。
有关详细信息,请参阅 Confluent Docs 中的偏移管理。
如果您想实现相同的行为,您可以简单地坚持同一个消费者组并简单地设置auto.offset.reset
为earliest
而不是latest
默认值:
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
或者
props.put("auto.offset.reset", "smallest")
(取决于您当前运行的版本)
这样,您的消费者将始终从最早的可用偏移量开始消费消息。
或者,您可以使用seekToBeginning()
:
kafkaConsumer.poll(0); // Heartbeat sent
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, String> records = kafkaConsumer.poll(5);
--from-beginning
请注意,这与可与 Kafka 控制台消费者一起使用的标志具有相同的效果。
推荐阅读
- angular - 当我在请求中“返回”时,“无效”类型上不存在属性“订阅”
- python - 使用来自另一个字典的子集数据创建新字典
- gsm - AT 协议 - 呼叫应答或拒绝
- spring - Spring boot - 多对多关联不删除连接表数据
- android - 根据用户选择更改 API URL
- ionic5 - 由于浏览器安全限制,ionic 5 PDFTron Webviewer 无法访问文件 URL
- android - 协程 RoomDB NullPointerException
- flutter - 我有一个 ListTiles 列表,如何将它们放入我的抽屉中?(扑)
- java - 尝试在 Android Studio 中列出目录的文件
- java - 通过对对象使用方法来改进循环