首页 > 解决方案 > StreamsException:无法初始化状态,如果 Kafka Streams 的多个实例在同一个状态目录中运行,可能会发生这种情况

问题描述

这是关于在生产中升级现有代码库,它使用从 kafka-clients、kafka-streams、spring-kafka 2.4.0 到 2.6.x 的窗口,以及将 spring-boot-starter-parent 从 2.2.2.RELEASE 升级到 2.3.x因为 2.2 与 kafka-streams 2.6 不兼容。

现有代码在旧版本(2.4.0、2.2 春季版本)中提到了以下这些 bean:

@Bean("DataCompressionCustomTopology")
public Topology customTopology(@Qualifier("CustomFactoryBean") StreamsBuilder streamsBuilder)  {
 //Your topology code
 return streamsBuilder.build();
}
    
@Bean("GenericKafkaStreams")
public KafkaStreams kStream() {
//Your kafka streams code
return kafkaStreams;
}

现在升级 kafka 流,kafka 客户端到 2.6.2 和 spring kafka 到 2.6.x 后,观察到以下异常:

2021-05-13 12:33:51.954 [Persistence-Realtime-Transformation] [main] WARN   o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'CustomFactoryBean'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
   

标签: spring-bootjava-8apache-kafka-streamsspring-kafka

解决方案


如果您的 Spring Cloud Streams Kafka Streams Binder 3.0 风格的应用程序中有一个复杂的 Kafka Streams 拓扑,您可能需要为不同的功能指定不同的应用程序 ID,如下所示:

spring.cloud.stream.function.definition: myFirstStream;mySecondStream
...
spring.cloud.stream.kafka.streams:
  binder:
    functions:
      myFirstStream:
        applicationId: app-id-1
      mySecondStream:
        applicationId: app-id-2

推荐阅读