spring - 使用 Spring Kafka 进行 Kafka 主题排序
问题描述
我正在尝试建立一个系统,该系统将从两个不同的 Kafka 主题中读取 - 一个用于实时消息,一个用于批量消息。希望是,无论有多少关于“批量”主题的消息,都优先考虑“实时”主题的任何内容。
似乎Spring Kafka开箱即用 - 有时。
我所拥有的只是:
@KafkaListener(topics = {"sync-live", "sync-bulk"}, concurrency = "1")
我的配置是:
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
有时这会完全符合我的要求,有时则不会。事实上,有几次我看到它从“实时”主题切换到“批量”主题,但仍有待处理的实时消息!
有没有办法告诉 Spring Kafka 在第一个主题为空时只读取第二个主题?
干杯
解决方案
这与 Spring 无关,来自消费者订阅的主题的消息分发是代理的功能。
问题是您对两个主题都使用单个使用者(侦听器容器)。
要为每个消费者获得专门的消费者,请使用多个注释......
@KafkaListener(groupId = "live.group", topics = "sync-live", concurrency = "1")
@KafkaListener(groupId = "bulk.group", topics = "sync-bulk", concurrency = "1")
public synchronized void listen(...) { ... }
这synchronized
将阻止两个容器同时调用侦听器。如果这不是问题,请忽略它。
推荐阅读
- r - R中data.table中的条件求和
- python-3.x - 从文件名中提取字符并将其作为值添加到 csv python
- spring - 为什么 spring.jpa.hibernate.ddl-auto 设置更新但 schema.sql 和 data.sql 仍在执行?
- html - 连接
- 和
- 在
我想创建页眉,我需要在一行中连接 dt 和 dd 这是我的视图代码:
<div style="margin-left:100px"> <hr /> <dl class="horizontal"> <dt>@Html.DisplayNameFor(model => model.labCashView.Patient_Nam
- php - Laravel:将“selected”属性添加到 HTML 选择选项,无论是从旧值还是数据库
- c - 使用 strcpy 和 strncpy 将目录中的文件列出到 C 中的数组中
- python - pytest html - 从测试文件将图像传递给 conftest.py 中的钩子
- google-bigquery - 比较表中的两列,得到相似和不相似的值
- python - conda env export --from-history 不跟踪频道
- dataframe - 如何在pyspark中按有序分类变量创建和排序