首页 > 解决方案 > 使用 kafka 和 python-kafka 库无限重新平衡消费者群体?

问题描述

我正在使用 docker swarm 部署 1 个 zookeeper 和 3 个 broker。我在那里有两个组,我的消费者应用程序被复制到一个特定的组来加入。

在团体中,我遇到了问题。当特定组和主题有一个消费者时,就没有问题了。但是,当我将消费者复制为大于一个消费者组时,似乎任一消费者都获得了所有分区,并且考虑到实施的分区算法,从来没有一个均匀的分布,除此之外,主要问题是群体的再平衡是无限的。

如果我不重新启动 kafka 服务器,则世代数将达到数十万。

在我的 kafka 配置(properties.server)中,我有:我的 group.max.session.timeout.ms = 300000

在我的消费者代码中,我使用的是 kafka-python:消费者使用以下配置进行实例化:

   self.kafka_server = KafkaConsumer(
                group_id=self.kafka_topic,
                bootstrap_servers=self.kafka_brokers,
                enable_auto_commit=True,
                value_deserializer=lambda x: json.loads(x),
                api_version=(0, 10),
                session_timeout_ms=300000,
                heartbeat_interval_ms=100000,
                max_poll_interval_ms=300000,
                partition_assignment_strategy=[RoundRobinPartitionAssignor, RangePartitionAssignor]
            )
            sleep(10)
            self.kafka_server.subscribe(topics=self.kafka_topic)

我的投票代码是:

  try:
            while True:
                fetch_from_all_partitions = self.kafka_server.poll(timeout_ms=3000, max_records=20)

消费者正在被正确联系并正在联系经纪人。但是,消费者日志显示没有问题。kafka 日志显示现在也有问题,似乎组协调员只是继续尝试无限地重新平衡。

谢谢

这是日志输出:

[2019-08-13 18:47:20,294] INFO [GroupCoordinator 2]: Preparing to rebalance group group_name in state PreparingRebalance with old generation 124 (__consumer_offsets-22) (reason: Adding new member kafka-python-1.4.6-e2ea4ede-b28a-4ff1-a986-ef42efe9101a with group instanceid None) (kafka.coordinator.group.GroupCoordinator)

[2019-08-13 18:47:20,295] INFO [GroupCoordinator 2]: Stabilized group group_name generation 125 (__consumer_offsets-22) (kafka.coordinator.group.GroupCoordinator)

[2019-08-13 18:47:20,296] INFO [GroupCoordinator 2]: Assignment received from leader for group group_name for generation 125 (kafka.coordinator.group.GroupCoordinator)

这是无限重复。

标签: apache-kafkakafka-consumer-api

解决方案


推荐阅读