python - 如何在使用 kafka-python 创建主题时添加主题级别日志保留期
问题描述
我目前正在使用 kafka-python==2.0.1,
from kafka import KafkaAdminClient
from kafka.admin import NewTopic
topic_name = "retention_test"
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])
topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=3, topic_configs={'log.retention.hours' : '100'})
response = admin.create_topics([topic])
print(response)
但无法创建主题,并引发以下错误 -
raise error_type(
kafka.errors.InvalidConfigurationError: [Error 40] InvalidConfigurationError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='retention_test', num_partitions=1, replication_factor=3, replica_assignment=[], configs=[(config_key='log.retention.hours', config_value=100)])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='retention_test', error_code=40, error_message='Unknown topic config name: log.retention.hours')])
解决方案
克里希纳,
据我所知,您提供给“NewTopic”的“NewTopics”对象不允许主题属性“log.retention.hours”。log.retention.hours 是代理的属性,在创建主题时用作默认值。当您更改主题的配置时,您应该指定一个主题级别的属性。
日志保留时间的主题级属性是retention.ms。
参考资料 (1) https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html (2) http://kafka.apache.org/081/documentation.html#topic-config (3)交叉引用:在运行时更改 kafka 保留期
推荐阅读
- java - 如何在 Swing 中正确移动绘制的多边形
- nuget - nuger.server 的 FindPackagesById() 不返回预发布包
- javascript - 给出 input.value 和 input.textContent 之间的区别。为什么用一种代替另一种?
- ruby-on-rails - ActionCable / Websocket 不适用于 Puma
- angular - 带有子查询的 AngularFireStore 返回集合
- postgresql - docker 中的鸡、鸡蛋和 postgres(引导 = 不能做的循环)
- tcp - Raspberry 与多个 Arduino 的长距离线路通信
- python - 将日期输入显示从 m/d/yy 扩展到 mm/dd/yyyy?
- php - 我的 .htaccess 破坏了我的 laravel 网站
- linux - Linux 命令循环