java - 如何将 Kafka 配置为像 FiFo 队列一样工作?
问题描述
我有一个小问题,那就是我必须使用 kafka 作为通知系统来启动我的应用程序的流程,一切都很好,但我想知道当队列中有未读消息时是否可以更改顺序.
让我解释一下:当主题中有两条或更多消息,并且消费者阅读它们时,首先阅读,最后一个进入,我希望它首先阅读,第一个进入。
所以我想知道这是否可能,如果可能,我们必须为此做出改变。
我正在使用最新版本的 Spring Apache 和 Kafka 2.5.0
消费者配置:
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
private static final String BOOTSTRAP_ADDRESS = "";
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_ADDRESS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "id");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
}
卡夫卡消费者:
@Component
public class KafkaConsumer {
@KafkaListener(topics = "Great_Topic")
private void listen(String msg) {
System.out.println("I've received: " + msg);
}
}
例子:
在我的生产者中,我按此顺序发送了以下消息:
- 你好世界 1
- 你好世界2
消费者的输出:
- 你好世界2
- 你好世界 1
我希望输出与输入具有相同的顺序。
解决方案
当你从一个主题消费时,你可以选择从头开始阅读,当你这样做时,你会按照它们在分区中产生的顺序获取所有消息。
如果您只有一个分区,您将获得严格的 FIFO 行为。
如果您有多个分区(就像您在这里一样),那么您会在一个分区内获得 FIFO,但跨分区它将是不确定的。因此,您可以获得您展示的行为。
推荐阅读
- html - 嵌套的 div 元素溢出 flex 容器
- python - 安装opencv后如何解决python中“在'__init__.py中找不到引用'imread'”的错误?
- android - 颤动的android MissingPluginException
- python-3.x - BLED112设置加密?
- docker - 在“eshoponcontainers”中,大多数 dockerfiles 都有复制(所有 csproj)和恢复,它不会对容器造成过度影响吗?
- sql-server-2017 - 语法错误 (*EngineEdition != 11) (Microsoft.SqlServer.Management.Sdk.Sfc)
- javascript - 将图像发布到 django rest API 始终返回“未提交文件”
- powershell - 解析从 Ansble 到 powershell 的变量
- c - 在异构 Linux 系统中从共享内存创建虚拟 FS
- vb.net - 如何在 MySQL 查询中使用 VB.NET 参数连接单引号?