python - Kafka Consumer 参数用于显示基于分区的键
问题描述
我正在尝试订阅具有三个不同键的主题 - 例如:alpha、beta、gamma
/broker/kafka/bin/kafka-console-consumer.sh --bootstrap-server "***.***.***:9092,***.***.***:9092,10.***.***.***:9092" --topic connect_offsets --property print.key=true --partition 5
我怎样才能以pythonic方式复制它?
当前代码 -
topic = ['connect_offsets']
from confluent_kafka import (
Consumer,
KafkaError,
KafkaException
)
kafka_conf = {
'bootstrap.servers': '***.***.***:9092,***.***.***:9092,10.***.***.***:9092',
'group.id': 'grp_connect_offsets_monitor',
'auto.offset.reset': 'earliest',
'enable.auto.commit': 'True',
'fetch.min.bytes' : 5242880
}
try:
consumer = Consumer(kafka_conf)
consumer.subscribe(topic)
logging.info(f'Subscribing to : {topic}')
try:
record = consumer.poll(timeout=1.0)
except Exception as e:
logging.error(f'Caught : {e} while polling {topic}')
pass
cleaned_record = record.key().decode('utf-8')
except Exception as e:
logging.error(f'Issue occurred, error is : {e}')
clean_record 只给我一个值,我想打印与之关联的不同 key() 和值。
解决方案
推荐阅读
- html - 如何使用 jinja2 语法从gentlest() 方法中获取单选按钮值?
- docker - 无法在具有 Linux Runner 的 Docker 容器中执行 GO 二进制文件
- python - 如何在 colab 中实现命令行参数?
- python - multiprocessing.Pool().map 工作函数错误中参数的多个值
- r - 如何过滤R中数据框每一列中的NA
- python - Python 脚本能否在其自身死亡时产生一个新进程?
- javascript - 从包含它们的 json 文件中自动化 Dialogflow 中提出的问题和答案
- sql - 如何在 Postgres 9.4 中忽略没有唯一约束的重复项?
- html - 如何填充标签以填充容器内的父 div
- arrays - KOTLIN 比较 n 维数组