首页 > 解决方案 > Kafka 流中的更改日志主题 - 设置或更改分区

问题描述

我们有一个流处理器应用程序,它使用来自具有 n 个分区 (n > 1) 的主题的数据。

从新开始(没有更新日志主题),开发环境总是创建一个包含 n 个分区的更新日志主题。

在相同的场景中,在生产环境中,分区数始终等于 1,然后我们手动更改为 n 以匹配主题的分区数。

我检查了所有文档,尝试为更改日志设置分区数,但找不到任何方法。我的最后一个选择是检查更改日志主题是否不存在,然后我使用 n 个分区创建它。

由于框架自动创建该主题,是否有任何方法可以设置更改日志的分区数,而无需手动或在代码中创建该主题?

PS:我们使用的是 Kafka 客户端版本 2.3.1。

谢谢,

奥斯汀

标签: apache-kafkastream-processing

解决方案


我刚刚查看了源代码以了解此功能的详细信息,在撰写本文时,事实证明设置change-logs主题的分区是不允许的。

解释

change-logs主题被归类为内部主题,并且在以下 2 类(InternalTopicConfigInternalTopicManager)中有证据证明这一点:

  1. InternalTopicConfig类的源代码包含以下方法,该方法还表示强制执行此类内部主题的分区数:

    public void setNumberOfPartitions(final int numberOfPartitions) {
    if (hasEnforcedNumberOfPartitions()) {
        throw new UnsupportedOperationException("number of partitions are enforced on topic " +
                                                "" + name() + " and can't be altered.");
    ...
    
  2. InternalTopicManager类的源代码中的嵌入式文档清楚地说明了该makeReady()方法。

    /**
    * Prepares a set of given internal topics.
    *
    * If a topic does not exist creates a new topic.
    * If a topic with the correct number of partitions exists ignores it.
    * If a topic exists already but has different number of partitions we fail and throw exception requesting user to reset the app before restarting again.
    * @return the set of topics which had to be newly created
    */
    public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) 
    ...
    

正如您在评论中看到的那样,如果存在这样一个分区计数正确的主题,它将被忽略,如果分区计数不正确,那么您会看到错误,建议使用应用程序重置工具

希望这可以帮助!


推荐阅读