java - 消费者重启后正在重播来自主题的 Kafka 消息
问题描述
我在 kafka 中遇到了一个奇怪的问题,即在消费者应用程序重新启动后,来自主题的所有 kafka 消息都被重播。谁能帮我我在这里做错了什么?
这是我的配置:
spring.kafka.consumer.auto-offset-reset=最早
spring.kafka.enable.auto.commit=false
我的生产者配置:
producerconfigs.put(ENABLE_IDEMPOTENCE_CONFIG, "true");
producerconfigs.put(ACKS_CONFIG, "all");
producerconfigs.put(ProducerConfig.CLIENT_ID_CONFIG, "client.id");
producerconfigs.put(RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
producerconfigs.put(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
producerconfigs.put(TRANSACTIONAL_ID_CONFIG, "V1-"+ UUID.randomUUID().toString());
消费者配置:
consumerconfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerconfigs.put(SESSION_TIMEOUT_MS_CONFIG, "10000");
consumerconfigs.put("isolation.level", "read_committed");
消费者代码:
@KafkaListener(topics = "TOPIC-1", groupId = "TOPIC-GRP", containerFactory = "kaListenerContainerFactory",concurrency = "10", autoStartup = "true")
public String processMesage(@Payload String message,@Header(value = KafkaHeaders.CORRELATION_ID, required = false) String correlationId,@Header(value = KafkaHeaders.OFFSET, required = false) String offset) throws JsonProcessingException {//business logic goes here }
容器代码
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryString());
factory.setBatchListener(true);
return factory;
}
消费者配置
Map<String, Object> getConsumerProperties() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
environment.getProperty("spring.kafka.consumer.bootstrap-servers"));
config.put(ConsumerConfig.GROUP_ID_CONFIG,
environment.getProperty("spring.kafka.consumer.group-id"));
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
environment.getProperty("spring.kafka.consumer.auto-offset-reset"));
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
environment.getProperty("spring.kafka.enable.auto.commit"));
config.put(KEY_DESERIALIZER_CLASS_CONFIG,
environment.getProperty("spring.kafka.consumer.key-deserializer"));
config.put(VALUE_DESERIALIZER_CLASS_CONFIG,
environment.getProperty("spring.kafka.consumer.value-deserializer"));
config.put("isolation.level", "read_committed");
return config;
}
应用程序属性
spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.consumer.group-id=consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.enable.auto.commit= false
解决方案
推荐阅读
- sql - Access sql 将数字转换为 - 或 . 为保密
- css - 使用媒体查询时,Div 不适合浏览器的宽度
- php - 在 for 循环语句中添加分页链接的正确方法是什么
- qtp - UFT/QTP 无法单击切换按钮
- reporting-services - SSRS 将字符串 12 小时时间转换为 24 小时时间
- angular - 未找到 AngularCompilerPlugin
- ionic-framework - 更新数据时刷新离子选择选项的问题
- ruby-on-rails - Rails 在 form_with 中包含一个 select 标记
- java - 在带有流的列表中查找距离目标 n 步的值
- angularjs - 使用角度更新firebase中的对象