首页 > 解决方案 > Java Kafka Consumer 可以与新的 Consumer Group ID 一起正常工作,但是在关闭并重新启动时它不会消耗任何消息

问题描述

这可能是我缺乏理解,但令人沮丧。我有一个 Java Kafka 消费者,当它被分配一个新创建的 groupId 和 consumerID 时,它会很好地使用消息。但是,当在我的 Eclipse IDE 中停止 java 应用程序并使用相同的 groupID 和 consumerID 重新启动它时,它不会提取任何消息。如果我再次关闭应用程序,并为其分配新的和不同的 groupID/consumerID,它就可以正常工作。谁能帮我弄清楚为什么会这样?

下面的配置值

props.put("bootstrap.servers","192.168.5.0:30092,192.168.4.6:30092,192.168.5.8:30092");
props.put("acks", "all");
props.put("retries", 4);
props.put("batch.size", 1000);
props.put("linger.ms", 1);
props.put("buffer.memory", 335544323);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "Router2");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "Consumer2");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "300000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");

标签: javaapache-kafkakafka-consumer-api

解决方案


默认情况下,当消费者开始阅读消息时,它会提交已成功消费的消息的偏移量,以便不再消费它们。

每次创建新的消费者组时,消费者将从最早的可用偏移量开始消费(因为之前没有为新创建的消费者组提交任何偏移量)。

消费者从协调者收到分配后,必须确定每个分配分区的初始位置。首次创建组时,在消费任何消息之前,根据可配置的偏移重置策略 ( auto.offset.reset) 设置位置。通常,消费从最早的偏移量或最新的偏移量开始。

有关详细信息,请参阅 Confluent Docs 中的偏移管理


如果您想实现相同的行为,您可以简单地坚持同一个消费者组并简单地设置auto.offset.resetearliest而不是latest默认值:

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

或者

props.put("auto.offset.reset", "smallest")

(取决于您当前运行的版本)

这样,您的消费者将始终从最早的可用偏移量开始消费消息。

或者,您可以使用seekToBeginning()

kafkaConsumer.poll(0); // Heartbeat sent
kafkaConsumer.seekToBeginning(kafkaConsumer.assignment());
ConsumerRecords<String, String> records = kafkaConsumer.poll(5);

--from-beginning请注意,这与可与 Kafka 控制台消费者一起使用的标志具有相同的效果。


推荐阅读