java - 在 Consumer 之间均匀分布 Kafka 分区
问题描述
我有一个包含 300 个分区的主题,并且有 100 个消费者/机器。我使用 Spring Kafka 作为实现 Kafka 消费者的底层框架。
我使用的是ConcurrentKafkaListenerContainerFactory,并发设置为3,所以理论上我应该有300个Consumer Container,一个分区应该连接一个容器,这样分区就均匀分布在100台机器上。
对于第一个构造函数,kafka 将在消费者之间分配分区。对于第二个构造函数, ConcurrentMessageListenerContainer 将 TopicPartition 分布在委托 KafkaMessageListenerContainer 上。
例如,如果提供了 6 个 TopicPartition 并且并发为 3;每个容器将获得 2 个分区。5个TopicPartition,2个容器分2个partition,3个分1个。如果并发大于TopicPartition个数,则调低并发,每个容器分1个partition。
但我没有看到上述行为,我看到一些 Containers/Machines 是 Idle,而另一些则连接到 6 个分区,这导致了 Kafka Topic 中的 Lag。
我在这里做错了什么,如何确保分区在容器之间均匀映射,并且没有容器映射到多个分区?请帮忙。
key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest
factory.setConcurrency(3);
@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)
EI_LISTNER_FACTORY 是一个 Bean..
@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {
Boolean eiCnsumerStartup = [START_UP From Configuration]
Integer concurrentThreadCount = 3;
Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);
if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;
}
解决方案
配置看起来不错。可能当您描述消费者组时,很少有消费者无法访问/空闲。因此,重新平衡会导致将相同的服务器容器线程分配给多个分区。
如果不是这种情况,请启用 kafka 级别的日志来监控分区分配和撤销的日志,以检查重新平衡是否触发了预期的结果。
推荐阅读
- sql-server - 如何仅使用它们之间共有的一个 ID 使用另一个表中的多个值更新临时表中的多行?
- javascript - 以不完整对象作为参数提供的构造函数的嵌套默认值
- javascript - 在数组中生成许多元素
- java - 如何在 JAVA 中获得所需的输出
- angular - Angular:即使在添加导入后也找不到管道“过滤器”
- python - Json 树 - 表达式评估
- java - 我可以使用哪个 JDK 版本来支持 Lombok 版本 1.16.16?
- javascript - 赛普拉斯:有没有办法断言输入的值是否为空或至少有一定数量的字符
- python - 如何将 Yolo 格式的边界框坐标转换为 OpenCV 格式
- microsoft-graph-api - MSgraph,最多需要 48 小时才能看到更改