首页 > 解决方案 > 由于 kafka 流存储正在等待运行,应用程序无法启动

问题描述

我有一个使用 kafka 流的 Spring Boot 应用程序(kafka docker image: wurstmeister/kafka:2.12-2.1.1, kafka dependencies: org.apache.kafka:kafka-streams:2.4.1)。在应用程序启动期间,我检查是否创建了主题 my-topic,如果没有 - 应用程序创建它。之后该应用程序创建KTable如下:

streamsBuilder.table("my-topic", Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("my-topic-store"))

此外,我创建商店以查询它:

while(true)
    try{
        return kafkaStreams.store("my-topic-store", QueryableStoreTypes.keyValueStore()) 
    } catch (InvalidStateStoreException e) {
        log.info("Waiting for store {} is RUNNING", "my-topic-store");
        Thread.sleep(1000);
    }
}

我的应用程序部署在 k8s 中。当新版本的应用程序准备好时,k8s 会启动新的应用程序并缩减旧的应用程序。问题是当新应用程序启动时,我在日志中只看到多行,例如:“Waiting for store my-topic-store is RUNNING。

我试图深入研究这个问题。从kafka文档看,1个partition只能被1个consumer读取,1个consumer可以读取多个partition。如果新的消费者来了并且所有的分区都已经被“占用”了,这个消费者就会变得空闲。在我们的例子中,当新应用启动时,意味着新消费者来了,它变得空闲,因为旧消费者的旧应用仍在工作,因此新消费者无法监听 kafka 分区。我应该注意到,该应用程序为 kafka 流配置了 5 个线程,并且每个主题都有 23 个主题,每个主题都有 1 个分区(我试图将分区号从 1 更改为 5,但没有帮助)。应用程序重新部署是在没有负载的情况下发生的。

标签: javaspring-bootapache-kafkaapache-kafka-streams

解决方案


您所描述的(在评论中)是预期的行为。

当您启动新应用程序时,它将加入消费者组。因为只有一个分区,新应用程序没有分配任何工作(没有理由重新分配工作,因为这只是一个昂贵的状态迁移;请注意,从重新平衡的角度来看,您的应用程序横向扩展了;不知道您打算停止已经存在的应用程序)。

当您最终停止旧应用程序时,会重新分配工作(和状态)。

另请注意,启动新实例永远不会停止任何现有实例。相反,如前所述,它被视为您的应用程序的扩展。

升级应用程序的推荐方法是先停止旧实例,然后在同一台服务器上重新启动新实例,以便它可以从磁盘中获取旧实例的状态。这避免了昂贵的状态迁移。


推荐阅读