python - 如何描述主题kafka的配置?
问题描述
我想描述一个主题的配置。我使用 confluent-kafka-python librairie(版本 1.5.0)开发了一个脚本,我的 python 版本是 2.7。我的最终目标是能够将保留时间更改为我的主题(retention.ms),但为此我需要提取我的主题的所有配置并仅更改我想要的内容,并将其他内容保留为已定义的内容。
我的脚本:
def describe_topic(admin_client, topic):
resources = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic)]
fs = admin_client.describe_configs(resources)
for resource, f in fs.items():
remote_config = f.result()
print(remote_config)
return resource, remote_config
但我有这个错误:
错误
文件“kafka.py”,第 192 行,在 describe_topic
资源 = [ConfigResource(confluent_kafka.admin.RESOURCE_TOPIC, topic)]
文件“environment/mamba-6Od8R-HF-py2.7/lib/python2.7/site-packages /kafka/admin/config_resource.py", 第 33 行, init
resource_type = ConfigResourceType[str(resource_type).upper()] # pylint: disable-msg=unsubscriptable-object
File "/product/tedhdev/environment/mamba-6Od8R -HF-py2.7/lib/python2.7/site-packages/enum/ init .py”,第 394 行,在getitem
返回 cls。member_map [名称] KeyError
: '2'\
有人能帮我吗?非常感谢
解决方案
import kafka
from kafka.admin import KafkaAdminClient, ConfigResource, ConfigResourceType
client_conn = kafka.KafkaAdminClient(bootstrap_servers="localhost:9092")
print(client_conn.describe_configs(config_resources=[ConfigResource(ConfigResourceType.TOPIC, "topic")]))
推荐阅读
- azure-devops - How Schedule Triggers work in Azure Devops?
- firebase - 对于具有可变天数的事件,如何构建 Firestore 集合以查询每天的标签?
- node.js - 如何存储承诺的价值并在解决后使用它?
- javascript - Nodejs + Postgresql - 当重复或违反外键时,我如何处理插入多行查询
- laravel - 在特定时间后在 Laravel 中上传图像不会加载或显示在服务器上
- flutter-web - 共享偏好颤动网络问题
- javascript - 如何使用 Joi 为输入字段赋予 null 的初始值?
- android - android studio中的recyclerview项目事故(不良行为)
- ld - ld:禁止更改部分开始
- haskell - 我如何告诉阴谋集团在哪里可以找到亚历克斯和快乐,这样阴谋集团就不会试图建造它们?