首页 > 解决方案 > KafkaConsumer 未能“重新订阅”

问题描述

我的任务是编写一些 Python3 脚本来测试 apache-kafka 系统。因此,应该从每个主题-分区对中读取一条消息。该例程订阅一个主题并转到主题分区中的随机偏移量,读取一条消息并取消订阅。例如,这意味着它订阅、取消订阅然后“重新订阅”一个 SAMPLE_TOPIC,以检查分区 0 和 1。“重新订阅”步骤似乎失败了,但我也遇到过,第一个订阅步骤失败,我使用partitions_for_topic-Function 修复了它。

我知道我可以在不取消和“重新订阅”的情况下实现它,但如果可能的话,我希望它能够按规定工作。

那么,有什么问题呢?
我错过了什么吗?还是这是由于 apache-kafka / kafka-python 的设计造成的?还是其中之一的错误?或者是其他东西?
我很感激任何帮助。
Kafka-Python=2.0.2
Apache-Kafka=2.7.0(根据kafka-topics --version
Python3.8.5

下面的代码是在 gitlab 管道中运行的原始代码的简化变体:

from random import randint
from kafka.consumer.fetcher import ConsumerRecord
from kafka.structs import TopicPartition
from kafka import KafkaConsumer

def output_check(kafka: KafkaConsumer, topic_partitions: list, offsets: list):    
    tp: TopicPartition
    offset_dict: dict
    for tp, offset_dict in zip(topic_partitions, offsets):
        print("\nOUTPUT_CHECK TOPIC_PARTITION:", tp)
        offset: int = randint(offset_dict["start"], offset_dict["end"])
        msg: ConsumerRecord = read_single_message(kafka, tp, offset)
        print()
        print(msg)

def read_single_message(kafka: KafkaConsumer, topic_partition: TopicPartition, offset: int):
    print("BEFORE SUBSCRIPTION - TOPIC:", topic_partition.topic)
    kafka.subscribe([(topic_partition.topic])
    print("\nAFTER SUBSCRIBE - SUBSCRIPTION:", kafka.subscription())
    print("AFTER SUBSCRIBE - ASSIGNMENT:", kafka.assignment())
    print("AFTER SUBSCRIBE - PARTITIONS FOR TOPIC", kafka.partitions_for_topic(topic_partition.topic))

    assert topic_partition.topic in kafka.subscription(), \
        "The Consumer is not subscribed to the given Topic" + str(topic_partition.topic) + "."
    if <OFFSET CHECK>:
        raise ArithmeticError("Offset {} is not existing in {}".format(offset, topic_partition))
    else:
        print("\nAFTER OFFSET CHECK - SUBSCRIPTION:", kafka.subscription())
        print("AFTER OFFSET CHECK - ASSIGNMENT:", kafka.assignment())
        kafka.seek(topic_partition, offset)

    kafka.poll(max_records=1)  # Poll Messages. Might be unnecessary...
    msg = None
    for msg in kafka:
        break
    kafka.unsubscribe()
    return msg

主题名称和消费者记录数据已更改:

OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=0)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC

AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}

AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: {TopicPartition(topic='SAMPLE_TOPIC', partition=4),
    TopicPartition(topic='SAMPLE_TOPIC', partition=7), TopicPartition(topic='SAMPLE_TOPIC', partition=0),
    TopicPartition(topic='SAMPLE_TOPIC', partition=6), TopicPartition(topic='SAMPLE_TOPIC', partition=3),
    TopicPartition(topic='SAMPLE_TOPIC', partition=2), TopicPartition(topic='SAMPLE_TOPIC', partition=5),
    TopicPartition(topic='SAMPLE_TOPIC', partition=1)}

ConsumerRecord(topic='SAMPLE_TOPIC', partition=0, offset=1, timestamp=163345635146,
    timestamp_type=0, key=b'SOME_KEY', value=b'SOME AVRO ENCODED MESSAGE', headers=[],
    checksum=None, serialized_key_size=12, serialized_value_size=954, serialized_header_size=-1)

OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=1)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC

AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}

AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: set()
Traceback (most recent call last):
  File "test_executor.py", line 88, in <module>
    main(argument_parsing())
  File "test_executor.py", line 58, in main
    BasicKafka.output_check(con_dict)
  File "...", line ..., in output_check
    msg: ConsumerRecord = read_single_message(kafka, tp, offset)
  File "...", line ..., in read_single_message
    kafka.seek(topic_partition, offset)
  File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 818, in seek
    assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition

标签: apache-kafkapython-3.8kafka-python

解决方案


推荐阅读