首页 > 解决方案 > 如何解决spring kafka中的重新平衡问题或记录重复

问题描述

我总是担心 Kafka 1. 重复 2. 缺少记录

我在春季 Kafka 2.2.2.RELEASE 中进行了以下更改以解决上述问题。有人可以确认这是否正确。

public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
    factory.getContainerProperties().setAckOnError(false);
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;

另外请确认,我是否需要在 ConcurrentKafkaListenerContainerFactory 中实现 ConsumerRebalanceListener。

如果需要如何实现。

   factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
        @Override
        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            //TODO code 
        }

        @Override
        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            //TODO code
        }
    });

标签: spring-kafka

解决方案


Kafka 提供 3 种交付保证:最多一次、至少一次、恰好一次。

如果您的数据仅位于 Kafka 中,并且您的处理结果写入 Kafka,您可以获得一次交付保证(请参阅此处的 Kafka Streams 库)。

如果您使用外部系统,例如其他数据库,Kafka 可以保证最多一次,至少一次。在这种情况下,您的应用程序必须确保从 Kafka 接收到的消息不会被处理两次。您的应用程序可以通过将所有已处理的消息保存在数据库中来做到这一点,当收到新消息时,应用程序将首先检查该消息是否已处理。

有关此内容的更多信息,请阅读:

Kafka 中的消息可靠性

卡夫卡消费者

编辑:

MAX_POLL_INTERVAL_MS_CONFIG 非常高。如果您的消息将被处理并保存在数据库中,30 秒就足够了。


推荐阅读