java - spring kafka 偏移增量甚至自动提交偏移设置为 false
问题描述
我正在尝试为manual offset commit
收到的消息实施kafka
. 我已将偏移提交设置为false
,但偏移值不断增加。
不知道是什么原因。需要帮助解决问题。
下面是代码
应用程序.yml
spring:
application:
name: kafka-consumer-sample
resources:
cache:
period: 60m
kafka:
bootstrapServers: localhost:9092
options:
enable:
auto:
commit: false
KafkaConfig.java
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return new DefaultKafkaConsumerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
return factory;
}
KafkaConsumer.java
@Service
public class KafkaConsumer {
@KafkaListener(topics = "#{'${kafka-consumer.topics}'.split(',')}", groupId = "${kafka-consumer.groupId}")
public void consume(ConsumerRecord<String, String> record) {
System.out.println("Consumed Kafka Record: " + record);
record.timestampType();
System.out.println("record.timestamp() = " + record.timestamp());
System.out.println("***********************************");
System.out.println(record.timestamp());
System.out.println("record.key() = " + record.key());
System.out.println("Consumed String Message : " + record.value());
}
}
输出如下
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 31, CreateTime = 1573570989565, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 10)
record.timestamp() = 1573570989565
***********************************
1573570989565
record.key() = null
Consumed String Message : 10
Consumed Kafka Record: ConsumerRecord(topic = test, partition = 0, offset = 32, CreateTime = 1573570991535, serialized key size = -1, serialized value size = 2, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = 11)
record.timestamp() = 1573570991535
***********************************
1573570991535
record.key() = null
Consumed String Message : 11
属性如下。
auto.commit.interval.ms = 100000000
auto.offset.reset = earliest
bootstrap.servers = [localhost:9092]
check.crcs = true
connections.max.idle.ms = 540000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = mygroup
heartbeat.interval.ms = 3000
这是在我重新启动消费者之后。我希望早期的数据也会被打印出来。
我的理解正确吗?请注意,我正在重新启动我的 springboot 应用程序,希望消息从第一个开始。并且我的 kafka 服务器和 zookeeper 没有终止。
解决方案
如果auto
使用此属性禁用确认ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
,则必须将容器级别的确认模式设置为MANUAL
并且不要提交,offset
因为默认情况下它设置为BATCH.
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}
因为当禁用自动确认时,容器级别的确认设置为BATCH
公共无效 setAckMode(ContainerProperties.AckMode ackMode)
设置自动确认(在配置属性中)为 false 时使用的确认模式。
- RECORD:每条记录传递给侦听器后的确认。
- BATCH:从消费者接收到的每批记录都传递给侦听器后的确认
- TIME:在此毫秒数后确认;(应该大于#setPollTimeout(long) pollTimeout。
- COUNT:至少收到此数量的记录后确认
- MANUAL:Listener 负责确认 - 使用 AcknowledgeingMessageListener。
参数:
ackMode - ContainerProperties.AckMode;默认批处理。
为提交偏移量提供了几个选项。如果 enable.auto.commit 消费者属性为 true,Kafka 会根据其配置自动提交偏移量。如果为 false,则容器支持多个 AckMode 设置(在下一个列表中描述)。默认的 AckMode 是 BATCH。从 2.3 版开始,框架将 enable.auto.commit 设置为 false,除非在配置中明确设置。以前,如果未设置该属性,则使用 Kafka 默认值 (true)。
如果您想始终从头开始阅读,则必须将此属性设置auto.offset.reset
为earliest
config.put(ConsumerConfig. AUTO_OFFSET_RESET_CONFIG, "earliest");
注意:确保groupId
必须是在 kafka 中没有任何偏移的新的
推荐阅读
- android - Android:列表离开屏幕时的RecyclerView布局问题
- azure - 查询 Azure App Services 是否有备份设置
- c# - C# 找不到带有 GraphQL 的包
- javascript - Chart.js:如何设置图表的最小高度但仍保持纵横比?
- c++ - use of flow operators on objects
- javascript - 我需要帮助更新此数组状态以显示为表数据
- type-conversion - 如何在 AutoHotkey 中将变量转换为布尔值?
- postgresql - 空闲事务 mybatis jboss 6.4 postgres 9.6
- python - 使用 Python sqlparse 获取查询树/层次结构
- excel - Excel 连接到 SQL Server:连接到不同 SQL 驱动程序的区别