java - 使用spring kafka ConsumerAwareRebalanceListener时得到空的TopicPartitions
问题描述
问题
使用 spring kafka 版本:org.springframework.kafka:spring-kafka:2.0.0.RELEASE。
通过 ConsumerAwareRebalanceListener.onPartitionsAssigned 寻求偏移量,但得到一个空的 TopicPartitions。
已经尝试过 ConsumerSeekAware,但也得到了一个空任务。
代码
Map<String, Object> configs = new HashMap<String, Object>();
configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.getConsumerBootstrapServers());
configs.put(ConsumerConfig.GROUP_ID_CONFIG, config.getConsumerGroupId());
configs.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, config.getConsumerMaxPollRecords());
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory<Object, Object>(configs);
ContainerProperties containerProperties = new ContainerProperties(config.getTopic());
containerProperties.setAckMode(AckMode.RECORD);
containerProperties.setMessageListener(messageListener);
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer,
Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
log.info("onPartitionsAssigned_start,topic:{},partitionOffset:{},"
+ "consumer:{},topicPartitions:{},assignment:{}",
config.getTopic(),JSON.toJSONString(partitionOffset),
JSON.toJSONString(consumer),JSON.toJSONString(partitions),JSON.toJSONString(consumer.assignment()));
for(TopicPartition tp:partitions) {
if(!tp.topic().equals(config.getTopic())) {
continue;
}
Long offsetStart = partitionOffset.get(tp.partition());
if(offsetStart==null) {
continue;
}
consumer.seek(tp, offsetStart);
log.info("consumer_seek,TopicPartition:{},offsetStart:{}",
JSON.toJSONString(tp),offsetStart);
}
}
});
container = new ConcurrentMessageListenerContainer<>(factory,containerProperties);
container.setConcurrency(config.getConcurrency());
container.start();
我以错误的方式使用api吗?任何建议将不胜感激!