首页 > 解决方案 > 如何在使用 kafka-python 创建主题时添加主题级别日志保留期

问题描述

我目前正在使用 kafka-python==2.0.1,

from kafka import KafkaAdminClient
from kafka.admin import NewTopic

topic_name = "retention_test"
admin = KafkaAdminClient(bootstrap_servers=['localhost:9092'])

topic = NewTopic(name=topic_name, num_partitions=1, replication_factor=3, topic_configs={'log.retention.hours' : '100'})
response = admin.create_topics([topic])
print(response)

但无法创建主题,并引发以下错误 -

raise error_type(
kafka.errors.InvalidConfigurationError: [Error 40] InvalidConfigurationError: Request 'CreateTopicsRequest_v3(create_topic_requests=[(topic='retention_test', num_partitions=1, replication_factor=3, replica_assignment=[], configs=[(config_key='log.retention.hours', config_value=100)])], timeout=30000, validate_only=False)' failed with response 'CreateTopicsResponse_v3(throttle_time_ms=0, topic_errors=[(topic='retention_test', error_code=40, error_message='Unknown topic config name: log.retention.hours')])

标签: pythonapache-kafkakafka-python

解决方案


克里希纳,

据我所知,您提供给“NewTopic”的“NewTopics”对象不允许主题属性“log.retention.hours”。log.retention.hours 是代理的属性,在创建主题时用作默认值。当您更改主题的配置时,您应该指定一个主题级别的属性。

日志保留时间的主题级属性是retention.ms。

参考资料 (1) https://kafka-python.readthedocs.io/en/master/apidoc/KafkaAdminClient.html (2) http://kafka.apache.org/081/documentation.html#topic-config (3)交叉引用:在运行时更改 kafka 保留期


推荐阅读