apache-kafka - KafkaConsumer 未能“重新订阅”
问题描述
我的任务是编写一些 Python3 脚本来测试 apache-kafka 系统。因此,应该从每个主题-分区对中读取一条消息。该例程订阅一个主题并转到主题分区中的随机偏移量,读取一条消息并取消订阅。例如,这意味着它订阅、取消订阅然后“重新订阅”一个 SAMPLE_TOPIC,以检查分区 0 和 1。“重新订阅”步骤似乎失败了,但我也遇到过,第一个订阅步骤失败,我使用partitions_for_topic
-Function 修复了它。
我知道我可以在不取消和“重新订阅”的情况下实现它,但如果可能的话,我希望它能够按规定工作。
那么,有什么问题呢?
我错过了什么吗?还是这是由于 apache-kafka / kafka-python 的设计造成的?还是其中之一的错误?或者是其他东西?
我很感激任何帮助。
Kafka-Python=2.0.2
Apache-Kafka=2.7.0(根据kafka-topics --version
)
Python3.8.5
下面的代码是在 gitlab 管道中运行的原始代码的简化变体:
from random import randint
from kafka.consumer.fetcher import ConsumerRecord
from kafka.structs import TopicPartition
from kafka import KafkaConsumer
def output_check(kafka: KafkaConsumer, topic_partitions: list, offsets: list):
tp: TopicPartition
offset_dict: dict
for tp, offset_dict in zip(topic_partitions, offsets):
print("\nOUTPUT_CHECK TOPIC_PARTITION:", tp)
offset: int = randint(offset_dict["start"], offset_dict["end"])
msg: ConsumerRecord = read_single_message(kafka, tp, offset)
print()
print(msg)
def read_single_message(kafka: KafkaConsumer, topic_partition: TopicPartition, offset: int):
print("BEFORE SUBSCRIPTION - TOPIC:", topic_partition.topic)
kafka.subscribe([(topic_partition.topic])
print("\nAFTER SUBSCRIBE - SUBSCRIPTION:", kafka.subscription())
print("AFTER SUBSCRIBE - ASSIGNMENT:", kafka.assignment())
print("AFTER SUBSCRIBE - PARTITIONS FOR TOPIC", kafka.partitions_for_topic(topic_partition.topic))
assert topic_partition.topic in kafka.subscription(), \
"The Consumer is not subscribed to the given Topic" + str(topic_partition.topic) + "."
if <OFFSET CHECK>:
raise ArithmeticError("Offset {} is not existing in {}".format(offset, topic_partition))
else:
print("\nAFTER OFFSET CHECK - SUBSCRIPTION:", kafka.subscription())
print("AFTER OFFSET CHECK - ASSIGNMENT:", kafka.assignment())
kafka.seek(topic_partition, offset)
kafka.poll(max_records=1) # Poll Messages. Might be unnecessary...
msg = None
for msg in kafka:
break
kafka.unsubscribe()
return msg
主题名称和消费者记录数据已更改:
OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=0)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC
AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}
AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: {TopicPartition(topic='SAMPLE_TOPIC', partition=4),
TopicPartition(topic='SAMPLE_TOPIC', partition=7), TopicPartition(topic='SAMPLE_TOPIC', partition=0),
TopicPartition(topic='SAMPLE_TOPIC', partition=6), TopicPartition(topic='SAMPLE_TOPIC', partition=3),
TopicPartition(topic='SAMPLE_TOPIC', partition=2), TopicPartition(topic='SAMPLE_TOPIC', partition=5),
TopicPartition(topic='SAMPLE_TOPIC', partition=1)}
ConsumerRecord(topic='SAMPLE_TOPIC', partition=0, offset=1, timestamp=163345635146,
timestamp_type=0, key=b'SOME_KEY', value=b'SOME AVRO ENCODED MESSAGE', headers=[],
checksum=None, serialized_key_size=12, serialized_value_size=954, serialized_header_size=-1)
OUTPUT_CHECK TOPIC_PARTITION: TopicPartition(topic='SAMPLE_TOPIC', partition=1)
BEFORE SUBSCRIPTION - TOPIC: SAMPLE_TOPIC
AFTER SUBSCRIBE - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER SUBSCRIBE - ASSIGNMENT: set()
AFTER SUBSCRIBE - PARTITIONS FOR TOPIC {0, 1, 2, 3, 4, 5, 6, 7}
AFTER OFFSET CHECK - SUBSCRIPTION: {'SAMPLE_TOPIC'}
AFTER OFFSET CHECK - ASSIGNMENT: set()
Traceback (most recent call last):
File "test_executor.py", line 88, in <module>
main(argument_parsing())
File "test_executor.py", line 58, in main
BasicKafka.output_check(con_dict)
File "...", line ..., in output_check
msg: ConsumerRecord = read_single_message(kafka, tp, offset)
File "...", line ..., in read_single_message
kafka.seek(topic_partition, offset)
File "/usr/lib/python3.8/site-packages/kafka/consumer/group.py", line 818, in seek
assert partition in self._subscription.assigned_partitions(), 'Unassigned partition'
AssertionError: Unassigned partition
解决方案
推荐阅读
- python - Tkinter 画布不显示
- android - 如果没有 @Inject 构造函数或 @Provides 注释方法,则无法提供 Dagger 类
- yii2 - Yii Query Builder:使用 where() 方法的参数绑定
- javascript - 使用 FormGroup 的 FormArray 修补 Angular 反应式表单
- arrays - laravel 更新 json 查询
- http - Flutter - http.get 在 macos 构建目标上失败:连接失败
- ios - 当我的应用程序使用 NotificationCenter 激活时,如何检查我的应用程序的联系人存储授权状态?
- python - 无法将数组参数传递给线程
- google-apps-script - 删除 gmail 中的所有 gmail 促销
- arrays - Mule - XML 数组只读取一个元素