首页 > 解决方案 > 如何设置 Kafka 消费者配置以从现在开始消费消息?

问题描述

我是 Kakfa 的新手,正在学习如何在 Kafka 主题中生成和使用消息。

我正在使用 @EnableKafka 使用 Kafka 配置

@EnableKafka
@Configuration
public class ConsumerConfig implements ApplicationContextAware {

    @Value("${kafka.servers}")
    private String kafkaServerAddress;

    @Value("${kafka.ca.groupid}")
    private String groupId;


    private ApplicationContext context;

    public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

        Map<String, Object> props = new HashMap<>();
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> binlogListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        DefaultKafkaConsumerFactory<String, Object> defaultFactory = consumerFactory();
        defaultFactory.setKeyDeserializer(new StringDeserializer());
        defaultFactory.setValueDeserializer(new JsonDeserializer(BinlogMessage.class));
        factory.setConsumerFactory(defaultFactory);
        return factory;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        context = applicationContext;
    }

}

标签: apache-kafkakafka-consumer-apiproducer-consumerspring-kafka

解决方案


得到答案,可以通过将属性 AUTO_OFFSET_RESET_CONFIG 设置为 latest 来完成,如下所示:

public DefaultKafkaConsumerFactory<String, Object> consumerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    return new DefaultKafkaConsumerFactory<>(props);
}

推荐阅读