首页 > 解决方案 > 如何将 Kafka 配置为像 FiFo 队列一样工作?

问题描述

我有一个小问题,那就是我必须使用 kafka 作为通知系统来启动我的应用程序的流程,一切都很好,但我想知道当队列中有未读消息时是否可以更改顺序.

让我解释一下:当主题中有两条或更多消息,并且消费者阅读它们时,首先阅读,最后一个进入,我希望它首先阅读,第一个进入。

所以我想知道这是否可能,如果可能,我们必须为此做出改变。

我正在使用最新版本的 Spring Apache 和 Kafka 2.5.0

消费者配置:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    private static final String BOOTSTRAP_ADDRESS = "";

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}


卡夫卡消费者:


@Component
public class KafkaConsumer {

    @KafkaListener(topics = "Great_Topic")
    private void listen(String msg) {
        System.out.println("I've received: " + msg);
    }
}


例子:

在我的生产者中,我按此顺序发送了以下消息:

消费者的输出:

我希望输出与输入具有相同的顺序。

标签: javaspringapache-kafka

解决方案


当你从一个主题消费时,你可以选择从头开始阅读,当你这样做时,你会按照它们在分区中产生的顺序获取所有消息。

如果您只有一个分区,您将获得严格的 FIFO 行为

如果您有多个分区(就像您在这里一样),那么您会在一个分区内获得 FIFO,但跨分区它将是不确定的。因此,您可以获得您展示的行为。


推荐阅读