首页 > 解决方案 > 使用spring kafka ConsumerAwareRebalanceListener时得到空的TopicPartitions

问题描述

问题

使用 spring kafka 版本:org.springframework.kafka:spring-kafka:2.0.0.RELEASE。

通过 ConsumerAwareRebalanceListener.onPartitionsAssigned 寻求偏移量,但得到一个空的 TopicPartitions。

已经尝试过 ConsumerSeekAware,但也得到了一个空任务。

代码

Map<String, Object> configs = new HashMap<String, Object>(); 
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getConsumerBootstrapServers());
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId());
        configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getConsumerMaxPollRecords()); 
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
    
        DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<Object, Object>(configs);
        
        ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
        containerProperties.setAckMode(AckMode.RECORD);
        containerProperties.setMessageListener(messageListener);
        containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

            @Override
            public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
                    Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
            }
            
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            }

            @Override
            public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
                log.info("onPartitionsAssigned_start,topic:{},partitionOffset:{},"
                        + "consumer:{},topicPartitions:{},assignment:{}",
                        config.getTopic(),JSON.toJSONString(partitionOffset),
                        JSON.toJSONString(consumer),JSON.toJSONString(partitions),JSON.toJSONString(consumer.assignment()));
                for(TopicPartition tp:partitions) {
                    if(!tp.topic().equals(config.getTopic())) {
                        continue;
                    }
                    Long offsetStart = partitionOffset.get(tp.partition());
                    if(offsetStart==null) {
                        continue;
                    }
                    consumer.seek(tp, offsetStart);
                    log.info("consumer_seek,TopicPartition:{},offsetStart:{}",
                            JSON.toJSONString(tp),offsetStart);
                }
                
            }
            
            
        });
       
        
        container = new ConcurrentMessageListenerContainer<>(factory,containerProperties);
        container.setConcurrency(config.getConcurrency());

        container.start();

我以错误的方式使用api吗?任何建议将不胜感激!

标签: javaspringapache-kafkaoffset

解决方案


推荐阅读