java - 基于 Spring Boot 的 Kafka 消费者确认策略
问题描述
我们有一个基于 Sprring-Boot 的 Kaffka 消费者,我们为此创建了一个这样的工厂:-
@Bean
public ConsumerFactory<String, Customer> customerConsumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "bootstrapServers");
config.put(ConsumerConfig.GROUP_ID_CONFIG, "${kafka.customer.consumer.group}");
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "5000");
config.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000");
config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"25000");
config.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,String.valueOf(Integer.MAX_VALUE));
config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");
return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(),
new JsonDeserializer<>(PaymentsHubResponse.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Customer> customerConsumerKafkaListenerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Customer> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(customerConsumerFactory());
return factory;
}
通过上述配置,我们打算在一次轮询中,该消费者最多读取 2 条记录,并且我们有手动确认策略。现在,消费者的代码如下所示:-
@KafkaListener(topics = "${kafka.consumer.topic}", groupId = "${kafka.consumer.group}", containerFactory="customerConsumerKafkaListenerFactory")
public void consumeResponseEventFromPH(Customer customerObject, Acknowledgment ack) {
acknowledgment.acknowledge();
// Business Logic.
}
问题 1 .) 此语句是否会acknowledgment.acknowledge();
同时向 Kafka 代理发送两个消息的确认,或者此方法本身是否会执行 2 次,每个传入消息执行一次?
问题 2.) 如果在处理这些消息的过程中出现问题怎么办?这些信息会永远丢失吗?
问题 3.) 有没有办法在每个消息级别发送条件确认?
问题 4.) 说,我从不承认信息?那么,有多少次,这条消息会从 Broker 再次传出?
MAX_POLL_RECORDS_DOC
问题 5.) 这两个 consumerConfig 属性和有什么区别MAX_POLL_RECORDS_CONFIG
?
答案将不胜感激。
- 谢谢阿迪亚
解决方案
Acknowledgment
只有在容器确认模式为MANUAL
(在处理完两个偏移后都提交)或MANUAL_IMMEDIATE
立即提交每个偏移(同步或异步取决于commitSync
属性)时,您才会得到一个。取决于版本;对于旧版本,错误只是被记录下来。对于最新版本,默认错误处理程序是
SeekToCurrentErrorHandler
. 默认情况下,将无延迟地尝试交付 10 次,然后记录。您可以配置一个在重试用尽后调用的恢复器(例如DeadLetterPublishingRecoverer
.不; Kafka 只维护一个偏移量;不承认离散记录。
除非您抛出异常(参见 2),否则它不会被重新交付。请参阅有关错误处理的参考手册。https://docs.spring.io/spring-kafka/docs/current/reference/html/#annotation-error-handling
一个 (_DOC) 是属性文档的文本,另一个 (_CONFIG) 是属性名称。
推荐阅读
- blockchain - 安装 startport mv:无法将“./starport”移动到“/usr/local/bin/starport”:权限被拒绝
- excel - 在不同的电子表格中使用相同的宏
- sql-server - 在表更新之前收集流运算符导致串行更新导致 SQL Server 2017 中长时间运行的查询
- couchbase - 如何链接调用
- php - Laravel API 接受输入并返回输出
- date - 如何将openweather unix时间戳转换为正确的日期
- python - 暂停在 redis-server 上的队列中执行 RQ 作业
- concurrency - CUDA 并行过滤整数
- p5.js - 椭圆边界内的粒子 p5.js
- sql - Postgresql多连接,查询末尾添加where子句后查询慢