apache-kafka - 为什么我的 Kafka 负载在代理之间没有平衡?
问题描述
我有一个消费者组从一个有十个分区的主题中读取:
[root@kafka01 kafka]# ./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group ssIncomingGroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
ssIncomingGroup ssIncoming 3 13688 13987 299 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 7 13484 13868 384 ssTS@influx01 (github.com/segmentio/kafka-go)-f1c5b4c7-9cf0-4132-902a-db9d0429d520 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 2 13322 13698 376 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 8 13612 13899 287 ssTS@influx01 (github.com/segmentio/kafka-go)-20ee82a9-825d-4d9a-9f20-f4610c21f171 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 1 13568 13932 364 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 6 13651 13950 299 ssTS@influx01 (github.com/segmentio/kafka-go)-df68ca85-d722-47ef-82c2-2fd60e186fac /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 0 13609 13896 287 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 5 13646 13945 299 ssTS@influx01 (github.com/segmentio/kafka-go)-10b7f10f-9535-4338-9851-f583a9a7c935 /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 4 13543 13843 300 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
ssIncomingGroup ssIncoming 9 13652 13951 299 ssTS@influx01 (github.com/segmentio/kafka-go)-3c847add-172f-4007-adf2-ce486686dd7c /192.168.33.10 ssTS@influx01 (github.com/segmentio/kafka-go)
我正在为 Go 使用 Segment.io Kaka 库:"github.com/segmentio/kafka-go"
.
我的卡夫卡作家看起来像这样:
kafkaWriter := kafka.NewWriter(kafka.WriterConfig{
Async: false,
Brokers: config.KafkaHosts, // a string slice of 4 Kafka hosts
QueueCapacity: kafkaQueueCapacity,
Topic: kafkaTopic,
Balancer: &kafka.LeastBytes{}, // Same result with the default round-robin balancer
})
我的 Kafka 阅读器如下所示:
kafkaReader := kafka.NewReader(kafka.ReaderConfig{
Brokers: config.KafkaHosts, // same as above
GroupID: config.KafkaGroup,
Topic: config.KafkaTopic, // same as above
})
该主题最初是这样创建的:
conn.CreateTopics(kafka.TopicConfig{
NumPartitions: config.KafkaPartitions, // == 10
ReplicationFactor: config.KafkaReplication, // == 1
Topic: kafkaTopic, // same as above
})
当我运行我的程序并观察主机和网络负载时,我发现几乎所有负载/网络活动都在四个 Kafka 代理之一上。当我du
记录 Kafka 主机的日志目录时,同一主机在 FS 上的 Kafka 数据比其他主机多得多(例如,150M 而不是 15M)。
我想要并期望发生的是在所有四个 Kafka 服务器之间分配负载,这样一个就不会成为瓶颈(来自 CPU 或网络)。为什么没有发生这种情况?
编辑(添加请求的命令输出):
[root@kafka01 kafka]# bin/kafka-topics.sh --describe --bootstrap-server localhost:9092
Topic: ssIncoming PartitionCount: 10 ReplicationFactor: 1 Configs: flush.ms=1000,segment.bytes=536870912,flush.messages=10000,retention.bytes=1073741824
Topic: ssIncoming Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 2 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 3 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 5 Leader: 2 Replicas: 2 Isr: 2
Topic: ssIncoming Partition: 6 Leader: 3 Replicas: 3 Isr: 3
Topic: ssIncoming Partition: 7 Leader: 1 Replicas: 1 Isr: 1
Topic: ssIncoming Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: ssIncoming Partition: 9 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets PartitionCount: 50 ReplicationFactor: 1 Configs: compression.type=producer,cleanup.policy=compact,flush.ms=1000,segment.bytes=104857600,flush.messages=10000,retention.bytes=1073$41824
Topic: __consumer_offsets Partition: 0 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 3 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 4 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 5 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 6 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 7 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 8 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 9 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 10 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 11 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 12 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 13 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 14 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 15 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 16 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 17 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 18 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 19 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 20 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 21 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 22 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 23 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 24 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 25 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 26 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 27 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 28 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 29 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 30 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 31 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 32 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 33 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 34 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 35 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 36 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 37 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 38 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 39 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 40 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 41 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 42 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 43 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 44 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 45 Leader: 1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 46 Leader: 2 Replicas: 2 Isr: 2
Topic: __consumer_offsets Partition: 47 Leader: 3 Replicas: 3 Isr: 3
Topic: __consumer_offsets Partition: 48 Leader: 4 Replicas: 4 Isr: 4
Topic: __consumer_offsets Partition: 49 Leader: 1 Replicas: 1 Isr: 1
(编辑 2):这是我在生成 Kafka 配置文件时使用的变量。它们对于 4 个经纪人中的每一个都是相同的。
scala_version: 2.12
kafka_config_broker_id: 0
kafka_config_log_dirs: "/tmp/kafka_logs"
kafka_config_log_flush_interval_messages: 10000
kafka_config_log_flush_interval_ms: 1000
kafka_config_log_retention_bytes: 1073741824
kafka_config_log_retention_check_interval: 60000
kafka_config_log_retention_hours: 168
kafka_config_log_segment_bytes: 536870912
kafka_config_num_io_threads: 4
kafka_config_num_network_threads: 2
kafka_config_num_partitions: 2
kafka_config_offsets_topic_replication_factor: 1
kafka_config_receive_buffer_bytes: 1048576
kafka_config_request_max_bytes: 104857600
kafka_config_send_buffer_bytes: 1048576
kafka_config_zookeeper_connection_timeout_ms: 1000000
kafka_config_zookeeper_servers:
- consul01
- consul02
- consul03
kafka_exporter_version: 1.2.0
kafka_port: 9092
kafka_version: 2.4.0
此数据用于 Ansible 模板。生成的 kafka conf 如下所示:
broker.id=1
port=9092
num.network.threads=2
num.io.threads=4
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka_logs
num.partitions=2
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.retention.bytes=1073741824
log.segment.bytes=536870912
log.retention.check.interval.ms=60000
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
offsets.topic.replication.factor=1
zookeeper.connect=consul01:2181,consul02:2181,consul03:2181
zookeeper.connection.timeout.ms=1000000
delete.topic.enable=true
请注意,这是用于开发的,并且这些内容经常被重新旋转(每天几次)。每次重新旋转后问题仍然存在。
解决方案
现在看来负载平衡得很好:
- 分区领导者以可能的最佳平衡方式分布在代理之间
- Broker 1 是分区 3,7 的领导者
- Broker 2 是分区 1,5,9 的领导者
- Broker 3 是分区 2,6 的领导者
- Broker 4 是分区 0,4,8 的领导者
- 分区也平均分配给消费者(每个消费者 2 个分区)
- 分区中的偏移量几乎相同(因此您似乎正在均匀地向分区生成消息)
当我查看 Kafka 主机的日志目录时,同一主机在 FS 上的 Kafka 数据比其他主机多得多(例如,150M 而不是 15M)。
分区中的日志偏移量几乎相同。但是当然,broker 2 和 4 必须有更多的数据,因为如您所见,它们要处理更多的分区。网络流量也必须更多,因为它们处理 3 个分区。(来自消费者的轮询请求,也发送来自生产者的请求)
但是在一个经纪人中仍然有 10 倍多的数据是不明智的。恕我直言,在某些时候一个或多个代理不健康(无法向 Zookeeper 发送心跳或关闭)并且控制器将分区分配给健康的代理,并且有一段时间一些代理正在处理更多的分区。(顺便说一句,这个场景auto.leader.rebalance.enable
必须是)true
注意:我假设您的代理配置(尤其是关于代理的配置对log.retention
存储在代理中的数据具有重要作用)和代理的系统资源是相同的。如果不是,您应该指定它。
顺便说一句,如果您对当前将分区分配给代理不满意。您可以使用kafka-reassign-partitions.sh
工具手动更改它。您只需要创建一个指定分区副本的 json 文件。
例如:
{"version":1,
"partitions":[
{"topic":"ssIncoming","partition":0,"replicas":[1]},
{"topic":"ssIncoming","partition":1,"replicas":[1]},
{"topic":"ssIncoming","partition":2,"replicas":[1]},
{"topic":"ssIncoming","partition":3,"replicas":[2]},
{"topic":"ssIncoming","partition":4,"replicas":[2]},
{"topic":"ssIncoming","partition":5,"replicas":[3]},
{"topic":"ssIncoming","partition":6,"replicas":[3]},
{"topic":"ssIncoming","partition":7,"replicas":[3]},
{"topic":"ssIncoming","partition":8,"replicas":[4]},
{"topic":"ssIncoming","partition":9,"replicas":[4]}
]}
然后你只需要运行这个命令:
./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file change-replicas.json --execute
推荐阅读
- python-3.x - 是否有允许我微调和提示文本完成的 GPT-2 实现?
- asp.net - 运行后台任务时防止 IIS 空闲超时
- c++ - 如何使用相同的迭代器获取列表的前一个元素?
- macos - 缺少 Qt 和 MacOs libiodbc.2.dylib 等库
- javascript - 获取 webpack 入口点名称并在加载器中重用它
- ubuntu - 在 ubuntu 中更改文件权限
- c# - 使用 Stream 生成的 SendGrid 附件显示不受保护
- excel - Excel:带有单元格的列的产品总和
- python - 为什么挤压不适用于稀疏数组?
- sql - IntegrityError:在表“carts_cart_items”上插入或更新违反外键约束