首页 > 解决方案 > setConsumerRebalanceListener 如何获取消费者

问题描述

我使用 spring-kafka 1.1.3.RELEASE 和 kafka-clients 0.10.0.0 并且我希望像这样在工厂中使用 setConsumerRebalanceListener ,但我不知道如何让消费者保存消费者分区。谢谢你的任何建议!

@Bean   

KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerBatchContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.setConcurrency(3);
            factory.getContainerProperties().setPollTimeout(3000);
            factory.getContainerProperties().setConsumerTaskExecutor(execD());
            factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);

            factory.getContainerProperties().setSyncCommits(true);

            factory.getContainerProperties().setConsumerRebalanceListener(new ConsumerRebalanceListener() {

                @Override
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                    for (TopicPartition partition:collection){
                    //TODO how to get consumer?    saveOffsetInExternalStore(consumer,partition.partition());
                    }
                }

                @Override
                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                     for (TopicPartition partition:collection){
                    //TODO how to get consumer?      
                       consumer.seek();
                    }
                }
            });
            factory.setBatchListener(true);

            return factory;
        }

我这样使用工厂:

 @KafkaListener(group = "CID_alikafka_B024",topicPattern = "data_.*",containerFactory = "kafkaListenerBatchContainerFactory")
    public void receive2(List<String> data,Acknowledgment acknowledgment,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                         @Header(KafkaHeaders.OFFSET) List<Long> offsets,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topicName,
                         @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<String> messageKeys) {
        logger.info("start of batch receive");

    }

我知道 spring kafka 2.1.9 有这样的 ConsumerAwareRebalanceListener,但我想使用 spring kafka 1.1.3.RELEASE 来兼容 kafka 0.10.0.0,我们 kafka 的版本是 0.10.0.0

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
            store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
            consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});

我的pom是:

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.3.RELEASE</version>
            <exclusions>
                <!-- exclude kafka version problem-->
                <exclusion>
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.0.0</version>
        </dependency>

标签: apache-kafkakafka-consumer-apispring-kafka

解决方案


1.1.3 在快速发展的 Apache Kafka 世界中已经过时了。您无法访问那里的消费者。

Spring for Apache KafkaConsumerAwareRebalanceListener在 2.0. 当前版本是 2.1.8。

如果您无法升级代理,较新版本的 Kafka 客户端可以与旧代理通信(但您应该升级,0.10.0.0 也很旧)。

请参阅文档

另请参阅解释兼容性的项目页面。从融合页面:

Broker 0.10.0 基本客户端兼容性:Java:客户端 <= 0.10.0 或 >= 0.10.2


推荐阅读