apache-kafka - Spring kafka 不重试未提交的偏移量
问题描述
我怎样才能停止 spring kafka 不要重试未阅读主题的消息。例如,我杀死应用程序然后重新启动它,我的消费者开始消费未消费的消息。我该如何预防?
@Bean
public ConsumerFactory<String, String> manualConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(configs);
}
/**
* Kafka manual ack listener container factory kafka listener container factory.
*
* @return the kafka listener container factory
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaManualAckListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(manualConsumerFactory());
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Override
@EventListener
public void processSettlementFile(final Notification notification) {
LOG.info("Handling message [{}]", notification);
try {
final Map<String, JobParameter> parameters = new HashMap<>();
parameters.put("fileName", new JobParameter("1-101-D-2017-212-volume-per-transaction.csv"));
parameters.put("bucket", new JobParameter("bucket-name-can-be-passed-also-from-kafka-todo"));
final JobParameters jobParameters = new JobParameters(parameters);
final JobExecution execution = jobLauncher.run(succeededTransactionCsvFileToDatabaseJob, jobParameters);
LOG.info("Job Execution Status: " + execution.getStatus());
} catch (JobExecutionAlreadyRunningException | JobRestartException | JobInstanceAlreadyCompleteException | JobParametersInvalidException e) {
LOG.error("Failed to process job..", e);
}
}
@KafkaListener(topics = "topic", groupId = "processor-service", clientIdPrefix = "string", containerFactory = "kafkaManualAckListenerContainerFactory")
public void listenAsString(@Payload final String payload, Acknowledgment acknowledgment, final ConsumerRecord<String, String> consumerRecord) throws TopicEventException {
applicationEventPublisher.publishEvent(object);
acknowledgment.acknowledge();
}
解决方案
您可以将 a 添加ConsumerAwareRebalanceListener
到容器配置中并调用consumer.seekToEnd(partitions)
.onPartitionsAssigned()
推荐阅读
- c# - 无法以编程方式在 Treeview 中设置自定义 TreeView 的选定项
- python - ValueError:检查输入时出错:预期dense_1_input的形状为(3000,)但得到的数组形状为(1,)
- git - eGit“比较”和“替换为”选项
- django - 31/5000 如何插入新用户?
- ffmpeg - 将带有 -filter_complex 的水印覆盖添加到多个输出(破折号)
- c++ - 在 SFINAE 的 void_t 中使用带有参数的方法
- python - 如何将阿拉伯语写入 CSV 文件
- sql-server - 使用 SQL Server 2019/Visual Studio 2019 创建 SSIS 包
- javascript - 带有事件循环的 AJAX 请求
- node.js - mongoose .find() 范围:如何从查询中调用找到的数据?