spring-boot - 如何在 Kafka 消费者重试用尽时设置确认
问题描述
我有一个 Kafka 消费者,它重试了 5 次,我正在使用 Spring Kafka 和重试模板。现在,如果所有重试都失败了,那么在这种情况下如何确认工作。另外,如果我已将确认模式设置为手动,那么如何确认这些消息
消费者
@Bean("kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(RetryTemplate retryTemplate) {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setRetryTemplate(retryTemplate);
factory.setRecoveryCallback(context -> {
log.error("Maximum retry policy has been reached {}", context.getAttribute("record"));
Acknowledgment ack = (Acknowledgment) context.getAttribute(RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT);
ack.acknowledge();
return null;
});
factory.setConcurrency(Integer.parseInt(kafkaConcurrency));
return factory;
}
卡夫卡监听器
@KafkaListener(topics = "${kafka.topic.json}", containerFactory = "kafkaListenerContainerFactory")
public void recieveSegmentService(String KafkaPayload, Acknowledgment acknowledgment) throws Exception {
KafkaSegmentTrigger kafkaSegmentTrigger;
kafkaSegmentTrigger = TransformUtil.fromJson(KafkaPayload, KafkaSegmentTrigger.class);
log.info("Trigger recieved from segment service {}", kafkaSegmentTrigger);
processMessage(kafkaSegmentTrigger);
acknowledgment.acknowledge();
}
解决方案
如果您有一个RecoveryCallback
在重试用尽时处理记录,并且您没有使用手动确认,则容器将提交偏移量。
如果您使用手动确认,您可以在恢复回调中提交偏移量。
传递给回调的上下文对象有一个属性RetryingMessageListenerAdapter.CONTEXT_ACKNOWLEDGMENT
(“确认”)。
它也有CONTEXT_CONSUMER
和CONTEXT_RECORD
。
推荐阅读
- sql - Oracle DB 查询自定义顺序
- ubuntu - Puppet apt::source 在 Ubuntu 18.04 上添加不需要的“仿生”存储库条目
- windows - 如何通过命令删除远程机器中临时文件夹的内容
- php - PHP 多 cURL 请求延迟到超时
- matlab - 如何减小 MATLAB 运行时编译器大小以制作独立应用程序?
- python-2.7 - 将类的函数设置为回调时出现错误“没有给出足够的参数”
- linkedin - 从 LinkedIn API 响应中检索个人资料图像
- c++ - 如何生成“未来索引”伪随机数而不生成其前面的数字
- php - 如何将两种 CSS 类型添加到嵌入式 HTML 回显命令中并执行表单验证技术?
- windows - Winlogbeat 消息字段丢失