spring-kafka - 如何解决spring kafka中的重新平衡问题或记录重复
问题描述
我总是担心 Kafka 1. 重复 2. 缺少记录
我在春季 Kafka 2.2.2.RELEASE 中进行了以下更改以解决上述问题。有人可以确认这是否正确。
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
return factory;
另外请确认,我是否需要在 ConcurrentKafkaListenerContainerFactory 中实现 ConsumerRebalanceListener。
如果需要如何实现。
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
//TODO code
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
//TODO code
}
});
解决方案
Kafka 提供 3 种交付保证:最多一次、至少一次、恰好一次。
如果您的数据仅位于 Kafka 中,并且您的处理结果写入 Kafka,您可以获得一次交付保证(请参阅此处的 Kafka Streams 库)。
如果您使用外部系统,例如其他数据库,Kafka 可以保证最多一次,至少一次。在这种情况下,您的应用程序必须确保从 Kafka 接收到的消息不会被处理两次。您的应用程序可以通过将所有已处理的消息保存在数据库中来做到这一点,当收到新消息时,应用程序将首先检查该消息是否已处理。
有关此内容的更多信息,请阅读:
编辑:
MAX_POLL_INTERVAL_MS_CONFIG 非常高。如果您的消息将被处理并保存在数据库中,30 秒就足够了。
推荐阅读
- iptables - strongswan ipsec 无法 ping 子网
- python - 视图 profile.views.ProfileDetailComment 没有返回 HttpResponse 对象。它返回 None 而不是
- elasticsearch - Elasticsearch 6.7 _reindex 操作不一致地报告映射器错误
- c - 使用 MinGW gcc 编译 C 时,为什么只指定 obj 名称而不使用 $(OBJS)?
- python - 为什么我的布尔值在while循环中没有变化?
- mysql - 从规范化表中选择数据
- javascript - 用户在'beforeunload'之后取消加载
- python - 如何从我的 JSON 数据中提取和打印值列表?
- c# - 在 Xamarin 中播放声音
- java - 线程“main”中的异常 java.util.NoSuchElementException 错误并且程序打印出错误的结果