首页 > 解决方案 > Kafka Consumer 参数用于显示基于分区的键

问题描述

我正在尝试订阅具有三个不同键的主题 - 例如:alpha、beta、gamma

/broker/kafka/bin/kafka-console-consumer.sh --bootstrap-server "***.***.***:9092,***.***.***:9092,10.***.***.***:9092" --topic connect_offsets  --property print.key=true --partition 5

我怎样才能以pythonic方式复制它?

当前代码 -

topic = ['connect_offsets']
from confluent_kafka import (
                            Consumer,
                            KafkaError,
                            KafkaException
                            )
kafka_conf = {
    'bootstrap.servers': '***.***.***:9092,***.***.***:9092,10.***.***.***:9092',
    'group.id': 'grp_connect_offsets_monitor',
    'auto.offset.reset': 'earliest',
    'enable.auto.commit': 'True',
    'fetch.min.bytes' : 5242880
    }
try:
        consumer = Consumer(kafka_conf)
        consumer.subscribe(topic)
        logging.info(f'Subscribing to : {topic}')
        try:
            record = consumer.poll(timeout=1.0)
        except Exception as e:
            logging.error(f'Caught : {e} while polling {topic}')
            pass
        cleaned_record = record.key().decode('utf-8')

except Exception as e:
        logging.error(f'Issue occurred, error is : {e}')

clean_record 只给我一个值,我想打印与之关联的不同 key() 和值。

标签: pythonkafka-consumer-apiconfluent-platform

解决方案


推荐阅读