首页 > 解决方案 > 偏移量为 0 的 Kafka 消费者轮询未返回消息

问题描述

我正在使用 spring-kafka 来轮询消息,当我为消费者使用注释并将偏移量设置为 0 时,它将最早看到所有消息。但是当我尝试使用注入的 ConsumerFactory 自己创建消费者时,poll 只会返回几条消息或根本没有消息。为了能够提取消息,我还需要其他一些配置吗?轮询超时已设置为 10 秒。

@Component
public class GenericConsumer {
  private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

  @Autowired
  ConsumerFactory<String, Record> consumerFactory;

  public ConsumerRecords<String, Record> poll(String topic, String group){
    logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
    Consumer<String, Record> consumer = consumerFactory.createConsumer(group, "");
    consumer.subscribe(Arrays.asList(topic));
    // need to make a dummy poll before we can seek
    consumer.poll(1000);
    consumer.seekToBeginning(consumer.assignment());
    ConsumerRecords<String, Record> records;
    records = consumer.poll(10000);
    logger.info("------------ Total " + records.count() + " records polled");
    consumer.close();
    return records;
  }
}

标签: apache-kafkaspring-kafka

解决方案


它对我来说很好,这是 boot 2.0.5,Spring Kafka 2.1.10 ...

@SpringBootApplication
public class So52284259Application implements ConsumerAwareRebalanceListener {

    private static final Logger logger = LoggerFactory.getLogger(So52284259Application.class);

    public static void main(String[] args) {
        SpringApplication.run(So52284259Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template, GenericConsumer consumer) {
        return args -> {
//          for (int i = 0; i < 1000; i++) { // load up the topic on first run
//              template.send("so52284259", "foo" + i);
//          }
            consumer.poll("so52284259", "generic");
        };
    }

    @KafkaListener(id = "listener", topics = "so52284259")
    public void listen(String in) {
        if ("foo999".equals(in)) {
            logger.info("@KafkaListener: " + in);
        }
    }

    @Override
    public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        consumer.seekToBeginning(partitions);
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("so52284259", 1, (short) 1);
    }

}

@Component
class GenericConsumer {

    private static final Logger logger = LoggerFactory.getLogger(GenericConsumer.class);

    @Autowired
    ConsumerFactory<String, String> consumerFactory;

    public void poll(String topic, String group) {
        logger.info("---------- Polling kafka recrods from topic " + topic + " group" + group);
        Consumer<String, String> consumer = consumerFactory.createConsumer(group, "");
        consumer.subscribe(Arrays.asList(topic));
        // need to make a dummy poll before we can seek
        consumer.poll(1000);
        consumer.seekToBeginning(consumer.assignment());
        ConsumerRecords<String, String> records;
        boolean done = false;
        while (!done) {
            records = consumer.poll(10000);
            logger.info("------------ Total " + records.count() + " records polled");
            Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
            while (iterator.hasNext()) {
                String value = iterator.next().value();
                if ("foo999".equals(value)) {
                    logger.info("Consumer: " + value);
                    done = true;
                }
            }
        }
        consumer.close();
    }

}

2018-09-12 09:35:25.929  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.931  INFO 61390 --- [           main] com.example.GenericConsumer              : ------------ Total 500 records polled
2018-09-12 09:35:25.932  INFO 61390 --- [           main] com.example.GenericConsumer              : Consumer: foo999
2018-09-12 09:35:25.942  INFO 61390 --- [ listener-0-C-1] com.example.So52284259Application        : @KafkaListener: foo999

推荐阅读