apache-kafka - 使用 kafka 和 python-kafka 库无限重新平衡消费者群体?
问题描述
我正在使用 docker swarm 部署 1 个 zookeeper 和 3 个 broker。我在那里有两个组,我的消费者应用程序被复制到一个特定的组来加入。
在团体中,我遇到了问题。当特定组和主题有一个消费者时,就没有问题了。但是,当我将消费者复制为大于一个消费者组时,似乎任一消费者都获得了所有分区,并且考虑到实施的分区算法,从来没有一个均匀的分布,除此之外,主要问题是群体的再平衡是无限的。
如果我不重新启动 kafka 服务器,则世代数将达到数十万。
在我的 kafka 配置(properties.server)中,我有:我的 group.max.session.timeout.ms = 300000
在我的消费者代码中,我使用的是 kafka-python:消费者使用以下配置进行实例化:
self.kafka_server = KafkaConsumer(
group_id=self.kafka_topic,
bootstrap_servers=self.kafka_brokers,
enable_auto_commit=True,
value_deserializer=lambda x: json.loads(x),
api_version=(0, 10),
session_timeout_ms=300000,
heartbeat_interval_ms=100000,
max_poll_interval_ms=300000,
partition_assignment_strategy=[RoundRobinPartitionAssignor, RangePartitionAssignor]
)
sleep(10)
self.kafka_server.subscribe(topics=self.kafka_topic)
我的投票代码是:
try:
while True:
fetch_from_all_partitions = self.kafka_server.poll(timeout_ms=3000, max_records=20)
消费者正在被正确联系并正在联系经纪人。但是,消费者日志显示没有问题。kafka 日志显示现在也有问题,似乎组协调员只是继续尝试无限地重新平衡。
谢谢
这是日志输出:
[2019-08-13 18:47:20,294] INFO [GroupCoordinator 2]: Preparing to rebalance group group_name in state PreparingRebalance with old generation 124 (__consumer_offsets-22) (reason: Adding new member kafka-python-1.4.6-e2ea4ede-b28a-4ff1-a986-ef42efe9101a with group instanceid None) (kafka.coordinator.group.GroupCoordinator)
[2019-08-13 18:47:20,295] INFO [GroupCoordinator 2]: Stabilized group group_name generation 125 (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)
[2019-08-13 18:47:20,296] INFO [GroupCoordinator 2]: Assignment received from leader for group group_name for generation 125 (kafka.coordinator.group.GroupCoordinator)
这是无限重复。
解决方案
推荐阅读
- swift - matchGeometryEffect 并不总是为位置变化设置动画
- java - 无法在递归问题中返回基本案例值
- javascript - [Vue 警告]:v-on 处理程序中的错误:“TypeError:this.auth.FacebookAuthProvider 不是构造函数”
- c++ - 对于关键部分 - 何时使用 std::mutex 与 std::atomic_flag?
- flutter - 将数据发送到新屏幕,但不知道如何设置
- outlook - 尝试使用 Microsoft Graph 获取/设置 messagerules API 时出现 ErrorAccessDenied
- google-apps-script - 具有动态模式的数据洞察连接器
- c# - 我可以简化我的 ConcurrentDictionary “添加或更新”方法吗?
- python - 有没有办法不使用mrjob在reducer def中包含第三个参数?
- python-3.x - 使用 SEQ 在 python 中记录 Azure 函数