首页 > 解决方案 > 一些 Python Confluent Kafka Consumer 挂起而无法正常工作

问题描述

我有 5 个消费者用于 avro 主题。对于每个消费者,它group.id是不同的,它是随机生成的。所以没有共享group.id。我注意到,无论我重启多少次,消费者都无法连接到代理,有时甚至更多。他们只是挂起。

所以我开始对分配给group.id有问题的消费者的名称进行一些更改,我发现相同的值使消费者工作,而其他一些则没有。

首先,这就是我正在使用的

avro-python3    1.10.1
confluent-kafka 1.5.0
python          3.7
Apache Broker version 0.9.0.1
Confluent Brokers community editions with 5 brokers and schema registry

这是消费者


class AvroConsumer:

    def __init__(self, **kwargs):
        self.topic = 'my-avro-topic'
        consumer_conf = {'session.timeout.ms': 6000, 
                          'enable.auto.commit': 'true', 
                          'log.connection.close': 'false', 
                          'default.topic.config': {'auto.offset.reset': 'latest'}, 
                          'schema.registry.url': 'http://xxxxxx:8081', 
                          'bootstrap.servers': 
             'aaaaaa:9092,bbbbbbbbb:9092,cccccc:9092,dddddd:9092,eeeeee:9092',  
            'security.protocol': 'plaintext', 
              'group.id': 'services_registry_consumer_test_01',
            'debug': 'consumer',
        'api.version.request': 'false'
        'broker.version.fallback' = '0.9.0.1'}
        
        self.consumer = AvroConsumer(consumer_conf)


    def run(self):
        
        sys.stderr.write(
            'Consumer started for topic(s) %s \n' % self.consumer_topic)

        try:
            self.consumer.subscribe(self.consumer_topic, on_assign=self.print_assignment)

            while True:

                incoming_message = self.consumer.poll(1)
                if incoming_message is None:
                    
                    continue
                elif incoming_message.error() is None:
                    message_payload = incoming_message.value()

                    
                        sys.stderr.write(
                            '[Topic:%s][Message:%s] \n' % (
                                str.upper(incoming_message.topic()), message_payload))
                        

                elif incoming_message.error():
                    if incoming_message.error().code() == KafkaError._PARTITION_EOF:
                        # End of partition event
                        sys.stderr.write('%% %s [%d] reached end at offset %d\n' %
                                         (incoming_message.topic(), incoming_message.partition(),
                                          incoming_message.offset()))
                    elif incoming_message.error():
                        sys.stderr.write(
                            ' Consumer Error {incoming_message.error()}\n')
                        raise KafkaException(incoming_message.error())


        except Exception as whatever_it_is:
            sys.stderr.write(
                ' EXCEPTION %s on the consumer for topic(s) %s\n' % (
                    whatever_it_is, self.consumer_topic))
            exit(1)

        finally:
            sys.stderr.write(
                'CLOSING consumer %s with topic(s): %s\n' % (
                    self.consumer, self.consumer_topic))
            self.consumer.close()
            return 1

    def print_assignment(self, consumer, partitions):
        sys.stderr.write(
            'Topic:%s  Assignment:%s\n' % (
                self.consumer_topic, partitions))

例如,如果我将值分配给services_registry_consumer_test_01group.id,一切都按预期工作,我得到了

%7|1610549830.729|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS STATIC_LIB_zlib ZLIB SSL SASL_CYRUS STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
[2021-01-13T14:57:11+00:00] Communication thread started for topic(s) ['my-avro-topic'] 
%7|1610549831.591|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": subscribe to new subscription of 1 topics (join state init)
%7|1610549831.591|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01" is rebalancing in state init (join-state init) without assignment: unsubscribe
%7|1610549832.135|JOIN|rdkafka#consumer-1| [thrd:main]: xxxxxxx:9092/4: Joining group "services_registry_consumer_test_01" with 1 subscribed topic(s)
%5|1610549832.135|MAXPOLL|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/4: Broker does not support KIP-62 (requires Apache Kafka >= v0.10.1.0): consumer configuration `max.poll.interval.ms` (300000) is effectively limited by `session.timeout.ms` (6000) with this broker version
%7|1610549835.397|ASSIGNOR|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": "range" assignor run for 1 member(s)
[2021-01-13T14:57:15+00:00]Topic:['my-avro-topic']  Assignment:[TopicPartition{topic=my-avro-topic,partition=0,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=1,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=2,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=3,offset=-1001,error=None}, TopicPartition{topic=my-avro-topic,partition=4,offset=-1001,error=None}]
%7|1610549835.488|ASSIGN|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_01": new assignment of 5 partition(s) in join state wait-assign-rebalance_cb
%7|1610549835.488|OFFSET|rdkafka#consumer-1| [thrd:main]: GroupCoordinator/4: Fetch committed offsets for 5/5 partition(s)
%7|1610549835.890|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [3] start fetching at offset 0
%7|1610549835.974|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [4] start fetching at offset 90
%7|1610549835.974|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [0] start fetching at offset 0
%7|1610549835.991|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [1] start fetching at offset 0
%7|1610549835.994|FETCH|rdkafka#consumer-1| [thrd:main]: Partition my-avro-topic [2] start fetching at offset 0

如果我将值分配给services_registry_consumer_test_02key group.id,消费者,以前工作的同一个,现在挂起而没有设法订阅给定的主题。

%7|1610549718.744|INIT|rdkafka#consumer-1| [thrd:app]: librdkafka v1.5.0 (0x10500ff) rdkafka#consumer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer, STATIC_LINKING GCC GXX PKGCONFIG OSXLD LIBDL PLUGINS STATIC_LIB_zlib ZLIB SSL SASL_CYRUS STATIC_LIB_libzstd ZSTD HDRHISTOGRAM SYSLOG SNAPPY SOCKEM SASL_SCRAM SASL_OAUTHBEARER CRC32C_HW, debug 0x2000)
[2021-01-13T14:55:19+00:00] Communication thread started for topic(s) ['my-avro-topic'] 
%7|1610549719.482|SUBSCRIBE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_02": subscribe to new subscription of 1 topics (join state init)
%7|1610549719.482|REBALANCE|rdkafka#consumer-1| [thrd:main]: Group "services_registry_consumer_test_02" is rebalancing in state init (join-state init) without assignment: unsubscribe

如果我重新分配值services_registry_consumer_test_01,一切都会按预期开始工作。我知道这听起来可能很奇怪,但事实就是如此,而且我已经睡了好几天了。

我不知道我是否在这里做错了什么,我非常感谢您对此的看法。

标签: python-3.xconfluent-kafka-python

解决方案


推荐阅读