首页 > 解决方案 > Kafka - 并非所有消费者都会收到订阅的消息

问题描述

为了一般使用 Kafka 发布消息,我使用类名作为主题:

kafkaProducer.send(new ProducerRecord(object.getClass().getName(), new DomainObjectAdapter(object).toJsonString()));

消费者订阅他们感兴趣的类:

    for(Object sub:_subscriptions)
        topics.add(sub.getClass().getName());
    _kafkaConsumer.subscribe(topics);

问题是,只有一个消费者收到订阅的消息。我的理解是,kafka 会为每个订阅者分配一个唯一的分区(如果有的话)。我目前只有 2 个订阅者,我的 kafka server.properties 指定了 4 个分区。似乎所有消费者都从同一个分区读取。由于这种明显的限制,对于服务总线来说,Kafka 可能是一个糟糕的选择。任何帮助将非常感激!

卡夫卡消费者属性:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("enable.auto.commit", "false");
    properties.put("group.id", "TestGroup");
    properties.put("auto.offset.reset","earliest");

Kafka生产者属性:

    properties.put("bootstrap.servers",_settings.getEndpoint());
    properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

服务器属性(我从默认属性中更改的唯一内容):

num.partitions=4

注意:我也尝试过消费者设置:

    properties.put("bootstrap.servers", _settings.getEndpoint());
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("auto.commit.interval.ms","1000");
    properties.put("enable.auto.commit", "true");
    properties.put("group.id", "testGroup");
    properties.put("auto.offset.reset","latest");
    properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RoundRobinAssignor");

标签: apache-kafkapartitionconsumer

解决方案


如果您的所有消费者都具有相同的消费者组(group.id属性),则该组中只有一个消费者会收到消息。如果您希望所有消费者都收到消息,他们需要有不同的group.id.

要检查哪些消费者绑定到主题的分区,可以使用以下命令

./bin/kafka-consumer-groups.sh --bootstrap-server yourhost:9092 --group testGroup --describe

推荐阅读