spring-boot - kafka 什么时候会重试处理未确认的消息?
问题描述
我有一个配置了手动 ACK 属性的消费者:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MessageAvro> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, MessageAvro> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
factory.setConsumerFactory(consumerFactory());
return factory;
}
还有一个使用@KafkaListener 方法的消费者,它做了一些工作,比如:
@KafkaListener(
topics = "${tpd.topic-name}",
containerFactory = "kafkaListenerContainerFactory",
groupId = "${tpd.group-id}")
public void messageListener(final ConsumerRecord<String, MessageAvro> msg, @Payload final MessageAvro message, final Acknowledgment ack) {
if (someCondition) {
// do something
ack.acknowledge();
} else {
// do not acknoledge the message here in order to retry it later.
}
}
如果条件为“假”并且我们继续进行“其他”部分,我的消费者何时会尝试再次阅读未确认的消息?
如果它不再这样做,我如何告诉我的@KafkaListener 考虑未确认的消息?
解决方案
一旦您提交(或“确认”)一个偏移量,所有先前的偏移量也会在某种意义上被提交,即 ConsumerGroup不会尝试再次读取它。
这意味着:如果您达到“else”条件并且您的作业继续以某种方式运行,它将达到“if”条件并确认所有偏移量都将被提交。
这背后的原因是 Kafka 消费者将报告给下一个要读取的偏移量的代理。为此,Kafka 将该信息存储在称为__consumer_offsets
键/值对的内部 Kafka 主题中,其中键:ConsumerGroup,主题名称,分区值:要读取的下一个偏移量
该内部主题是一个压缩主题,这意味着它最终只会存储提到的键的最新值。因此,Kafka 不会跟踪介于两者之间的“未确认”消息。
解决方法
人们通常所做的是将那些“未确认”的消息分叉到另一个主题中,以便稍后可以一起检查和使用它们。这样,您就不会阻止您的实际应用程序使用更多消息,并且您可以单独处理未确认的消息。
推荐阅读
- php - 我试图在laravel中打印我的数据变量中的count值,以下是变量的值
- scheme - Miller-Rabin 测试 (SICP 1.28)
- python - 将 JSON 文件转换为具有定义格式的 Dataframe
- r - 在数据框中移动数据
- c++ - 如何将类对象返回到空状态以便可以重用?
- elixir - 将项目模块导入 IEX 会话
- tkinter - 如何让 tkinter 按钮在嵌套函数中销毁/退出根窗口?
- python - opencv去除边缘的白色像素
- java - 当您从子类调用构造函数中的方法时会发生什么?
- unity3d - 当通过相机看到时,有没有办法改变单位深度物体的颜色