apache-kafka - Kafka - 并非所有消费者都会收到订阅的消息
问题描述
为了一般使用 Kafka 发布消息,我使用类名作为主题:
kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));
消费者订阅他们感兴趣的类:
for(Object sub:_subscriptions)
topics.add(sub.getClass().getName());
_kafkaConsumer.subscribe(topics);
问题是,只有一个消费者收到订阅的消息。我的理解是,kafka 会为每个订阅者分配一个唯一的分区(如果有的话)。我目前只有 2 个订阅者,我的 kafka server.properties 指定了 4 个分区。似乎所有消费者都从同一个分区读取。由于这种明显的限制,对于服务总线来说,Kafka 可能是一个糟糕的选择。任何帮助将非常感激!
卡夫卡消费者属性:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
properties.put("group.id", "TestGroup");
properties.put("auto.offset.reset","earliest");
Kafka生产者属性:
properties.put("bootstrap.servers",_settings.getEndpoint());
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
服务器属性(我从默认属性中更改的唯一内容):
num.partitions=4
注意:我也尝试过消费者设置:
properties.put("bootstrap.servers", _settings.getEndpoint());
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("auto.commit.interval.ms","1000");
properties.put("enable.auto.commit", "true");
properties.put("group.id", "testGroup");
properties.put("auto.offset.reset","latest");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");
解决方案
如果您的所有消费者都具有相同的消费者组(group.id
属性),则该组中只有一个消费者会收到消息。如果您希望所有消费者都收到消息,他们需要有不同的group.id
.
要检查哪些消费者绑定到主题的分区,可以使用以下命令
./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe
推荐阅读
- windows - 为什么即使检查隐藏文件后我的主机文件也不可见?
- python - 用于从以下用户获取提要的 Django 查询集
- python - 在嵌套循环中查找 max 的最小索引,键严格在第 0 个索引中
- c# - 嵌套应用程序显示外部 404 页面而不是它自己的 404 页面
- java - Hibernate 缓存并发中的只读与非限制读写
- typescript - 在 Typescript 中获取所有 Private 成员列表的方法?
- java - 如何正确定义 DTO 属性
- c# - 如何使业务层与不同的模型一起工作?
- python - if/else 语句中的 return 语句在 Python 中不返回值
- c - 实现我自己的互斥锁时如何从内联汇编中引用 C 中的指针