首页 > 解决方案 > 消费者重启后正在重播来自主题的 Kafka 消息

问题描述

我在 kafka 中遇到了一个奇怪的问题,即在消费者应用程序重新启动后,来自主题的所有 kafka 消息都被重播。谁能帮我我在这里做错了什么?

这是我的配置:

spring.kafka.consumer.auto-offset-reset=最早

spring.kafka.enable.auto.commit=false

我的生产者配置:

        producerconfigs.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
        producerconfigs.put(ACKS_CONFIG, "all");
        producerconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id");
        producerconfigs.put(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        producerconfigs.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
        producerconfigs.put(TRANSACTIONAL_ID_CONFIG, "V1-"+ UUID.randomUUID().toString());

消费者配置:

        consumerconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        consumerconfigs.put(SESSION_TIMEOUT_MS_CONFIG, "10000");
        consumerconfigs.put("isolation.level", "read_committed");

消费者代码:

@KafkaListener(topics = "TOPIC-1", groupId = "TOPIC-GRP", containerFactory = "kaListenerContainerFactory",concurrency = "10", autoStartup = "true")
public String processMesage(@Payload String message,@Header(value = KafkaHeaders.CORRELATION_ID, required = false) String correlationId,@Header(value = KafkaHeaders.OFFSET, required = false) String offset) throws JsonProcessingException {//business logic goes here }

容器代码

     @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
                kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactoryString());
        factory.setBatchListener(true);
        return factory;
    }

消费者配置

            Map<String, Object> getConsumerProperties() {
         Map<String, Object> config = new HashMap<>();
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
        config.put(ConsumerConfig.GROUP_ID_CONFIG, 
       environment.getProperty("spring.kafka.consumer.group-id"));
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
        environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
        config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
       environment.getProperty("spring.kafka.enable.auto.commit"));
        config.put(KEY_DESERIALIZER_CLASS_CONFIG, 
      environment.getProperty("spring.kafka.consumer.key-deserializer"));
        config.put(VALUE_DESERIALIZER_CLASS_CONFIG,  
      environment.getProperty("spring.kafka.consumer.value-deserializer"));
        config.put("isolation.level", "read_committed");
        return config;
    }

应用程序属性

spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.enable.auto.commit= false

标签: javaspring-bootapache-kafkakafka-consumer-apikafka-producer-api

解决方案


推荐阅读