首页 > 解决方案 > 在 Consumer 之间均匀分布 Kafka 分区

问题描述

我有一个包含 300 个分区的主题,并且有 100 个消费者/机器。我使用 Spring Kafka 作为实现 Kafka 消费者的底层框架。

我使用的是ConcurrentKafkaListenerContainerFactory,并发设置为3,所以理论上我应该有300个Consumer Container,一个分区应该连接一个容器,这样分区就均匀分布在100台机器上。

对于第一个构造函数,kafka 将在消费者之间分配分区。对于第二个构造函数, ConcurrentMessageListenerContainer 将 TopicPartition 分布在委托 KafkaMessageListenerContainer 上。

例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。5个TopicPartition,2个容器分2个partition,3个分1个。如果并发大于TopicPartition个数,则调低并发,每个容器分1个partition。

但我没有看到上述行为,我看到一些 Containers/Machines 是 Idle,而另一些则连接到 6 个分区,这导致了 Kafka Topic 中的 Lag。

我在这里做错了什么,如何确保分区在容器之间均匀映射,并且没有容器映射到多个分区?请帮忙。

key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit  : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest


factory.setConcurrency(3);

@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)

EI_LISTNER_FACTORY 是一个 Bean..

@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {

    Boolean eiCnsumerStartup = [START_UP From Configuration]

    Integer concurrentThreadCount = 3;

    Map<String, Object> config = [properties from ABOVE]
    ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
    factory.setAutoStartup(eiConsumerStartup);

    if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConcurrency(concurrentThreadCount);
    }
    return factory;

}

标签: javaapache-kafkakafka-consumer-apispring-kafka

解决方案


配置看起来不错。可能当您描述消费者组时,很少有消费者无法访问/空闲。因此,重新平衡会导致将相同的服务器容器线程分配给多个分区。

如果不是这种情况,请启用 kafka 级别的日志来监控分区分配和撤销的日志,以检查重新平衡是否触发了预期的结果。


推荐阅读