java - Spring @KafkaListener 和并发
问题描述
我正在使用 spring boot + spring @KafkaListener。我期望的行为是:我的 kafka 监听器在 10 个线程中读取消息。因此,如果其中一个线程挂起,其他消息将继续读取和处理消息。
我定义了 bean
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory)
{
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setMissingTopicsFatal(false);
factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
return factory;
}
和弹簧启动配置:
spring.kafka.listener.concurrency=10
我看到所有配置都有效,我在 jmx 中看到了我的 10 个线程:
但后来我做了这样的测试:
@KafkaListener(topics = {
"${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record)
{
if(record.getVersion() < 3) {
try {
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
else
System.out.println("It works!");
}
如果版本 < 3,则挂起,否则 - 工作。我发送了 3 条版本为 1,2 和 3 的消息。我希望版本 1 和 2 的消息会挂起,但版本 3 将在监听器时处理。但不幸的是,版本 3 的消息在开始处理之前等待消息 1 和 2。
也许我的期望不正确,这是卡夫卡听众的正确行为。请帮我处理kafka并发,为什么会这样?
解决方案
卡夫卡不是这样工作的。您至少需要与消费者一样多的分区(由concurrency
弹簧容器控制)。
此外,一次只有一个消费者(在一个组中)可以从一个分区消费,因此,即使您增加分区,“卡住”消费者后面的同一分区中的记录也不会被其他消费者接收。
推荐阅读
- c# - 找不到与给定名称匹配的资源(在 'src' 处,值为 '@android:drawable/logo.png
- vaadin - 如何在服务器端 Java 代码中获取 Vaadin 10 中的浏览器窗口宽度
- c - 在c中创建n个链表
- tensorflow - 如何在不使用导出和重新加载的情况下从 TF ODI 训练器中提取推理图?
- zend-studio - Zend Studio 问题添加 Zend Server - Windows 10
- android - 如何在Android Studio类上自动插入注释@Since
- haskell - TimeDuration 类型可以是 Semigroup 和 Monoid 的实例吗?
- angular - 如何将登录名与应用程序的其余部分分开?
- javascript - 如何在 JavaScript if/else 语句和 Bootstrap 4 下拉菜单中使用 innerHTML
- c# - C#如何获取字典的第一个元素并移动到另一个