apache-kafka - setConsumerRebalanceListener 如何获取消费者
问题描述
我使用 spring-kafka 1.1.3.RELEASE 和 kafka-clients 0.10.0.0 并且我希望像这样在工厂中使用 setConsumerRebalanceListener ,但我不知道如何让消费者保存消费者分区。谢谢你的任何建议!
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerBatchContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
factory.getContainerProperties().setConsumerTaskExecutor(execD());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {
@Override
public void onPartitionsRevoked(Collection<TopicPartition> collection) {
for (TopicPartition partition:collection){
//TODO how to get consumer? saveOffsetInExternalStore(consumer,partition.partition());
}
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
for (TopicPartition partition:collection){
//TODO how to get consumer?
consumer.seek();
}
}
});
factory.setBatchListener(true);
return factory;
}
我这样使用工厂:
@KafkaListener(group = "CID_alikafka_B024",topicPattern = "data_.*",containerFactory = "kafkaListenerBatchContainerFactory")
public void receive2(List<String> data,Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topicName,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys) {
logger.info("start of batch receive");
}
我知道 spring kafka 2.1.9 有这样的 ConsumerAwareRebalanceListener,但我想使用 spring kafka 1.1.3.RELEASE 来兼容 kafka 0.10.0.0,我们 kafka 的版本是 0.10.0.0
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
我的pom是:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.1.3.RELEASE</version>
<exclusions>
<!-- exclude kafka version problem-->
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.0.0</version>
</dependency>
解决方案
推荐阅读
- javascript - 如何在 Firebase 中正确更新电子邮件和敏感数据?
- html - 使用放松时的视差空白
- android - 为什么我从 reCaptcha API 得到的响应为空?
- node.js - 需要帮助了解为什么我的后端 Nodejs 没有收到我的 React Redux 前端的图片上传请求正文
- matlab - Matlab 2021b 无法导出到独立的 FMU
- python - 升级到 Django 3.2 后,我在 `ArrayAgg` 字段上的注释不起作用
- r - 在 R 中绘制多个回归时出现点须错误
- game-development - 如何在 Godot 中将生物群系添加到具有开放单纯形噪声的程序生成地形
- java - postgres jdbc 批量查询复合 PK
- r - 是否可以在 R 包 vcd 中使用 pair_barplot 更改 barplot 刻度线标签的字体大小?