java - 由于 kafka 流存储正在等待运行,应用程序无法启动
问题描述
我有一个使用 kafka 流的 Spring Boot 应用程序(kafka docker image: wurstmeister/kafka:2.12-2.1.1
, kafka dependencies: org.apache.kafka:kafka-streams:2.4.1
)。在应用程序启动期间,我检查是否创建了主题 my-topic,如果没有 - 应用程序创建它。之后该应用程序创建KTable
如下:
streamsBuilder.table("my-topic", Consumed.with(Serdes.String(), Serdes.String()), Materialized.as("my-topic-store"))
此外,我创建商店以查询它:
while(true)
try{
return kafkaStreams.store("my-topic-store", QueryableStoreTypes.keyValueStore())
} catch (InvalidStateStoreException e) {
log.info("Waiting for store {} is RUNNING", "my-topic-store");
Thread.sleep(1000);
}
}
我的应用程序部署在 k8s 中。当新版本的应用程序准备好时,k8s 会启动新的应用程序并缩减旧的应用程序。问题是当新应用程序启动时,我在日志中只看到多行,例如:“Waiting for store my-topic-store is RUNNING。
我试图深入研究这个问题。从kafka文档看,1个partition只能被1个consumer读取,1个consumer可以读取多个partition。如果新的消费者来了并且所有的分区都已经被“占用”了,这个消费者就会变得空闲。在我们的例子中,当新应用启动时,意味着新消费者来了,它变得空闲,因为旧消费者的旧应用仍在工作,因此新消费者无法监听 kafka 分区。我应该注意到,该应用程序为 kafka 流配置了 5 个线程,并且每个主题都有 23 个主题,每个主题都有 1 个分区(我试图将分区号从 1 更改为 5,但没有帮助)。应用程序重新部署是在没有负载的情况下发生的。
解决方案
您所描述的(在评论中)是预期的行为。
当您启动新应用程序时,它将加入消费者组。因为只有一个分区,新应用程序没有分配任何工作(没有理由重新分配工作,因为这只是一个昂贵的状态迁移;请注意,从重新平衡的角度来看,您的应用程序横向扩展了;不知道您打算停止已经存在的应用程序)。
当您最终停止旧应用程序时,会重新分配工作(和状态)。
另请注意,启动新实例永远不会停止任何现有实例。相反,如前所述,它被视为您的应用程序的扩展。
升级应用程序的推荐方法是先停止旧实例,然后在同一台服务器上重新启动新实例,以便它可以从磁盘中获取旧实例的状态。这避免了昂贵的状态迁移。
推荐阅读
- angularjs - 我们可以阻止 AngularJS 将 ng-scope 添加到 Angular 9 组件元素吗?
- c# - IAsyncEnumerable 的早期一次性使用需要永远
- mysql - MySQL Join 和 group by 在同一张表上
- r - 如何过滤 OSM 形状内的所有点?
- reshape - 检查 numpy 数组形状是否为 (),如果是,则重塑为 (1,)
- spring-boot - MapStruct:如何使用 mapstruct 将 String 转换为 byte[]
- android - 如何更改gradle的设置?
- python - 字节类型序列化的python json问题
- android - Mockito lenient() 何时使用
- mongodb - 猫鼬连接抛出 MongooseServerSelectionError