首页 > 解决方案 > Spring Cloud Stream(Kafka Streams binder)应用程序不能支持多个处理器

问题描述

我们使用 Spring Cloud Stream (3.0.9.RELEASE) 和 Kafka 流绑定器编写了一个基本的流处理器。实际项目包含多个用@SpringBootApplication 注解的类,每个类都有自己的功能,但一次只使用一个。

我们遇到的问题是,当将我们的应用程序捆绑到 JAR 中并将其作为 k8s 部署运行时,如果我们一次运行超过 1 个副本,那么这些额外的副本会在几分钟后终止,并且没有有意义的错误消息或显而易见的原因。例如,如果我们运行 5 个副本,其中 1 个将无限期地停留并处理事件,但其他 4 个(尽管它们最初也开始处理事件)将在不久之后退出。显然,如果我们将部署设置为仅运行 1 个副本,则没有问题,但处理器需要可扩展。

有关设置应用程序 ID 的文档的这一部分似乎相关:

For production deployments, it is highly recommended to explicitly specify the application ID
through configuration. This is especially going to be very critical if you are auto scaling your
application in which case you need to make sure that you are deploying each instance with the
same application ID.

但既没有在绑定级别设置这个值(对于我们的 normalize 函数):

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            functions:
              normalize:
                applicationId: normalizer-full

也不将其设置在活页夹级别:

spring:
  cloud:
    stream:
      kafka:
        streams:
          binder:
            applicationId: normalizer-full

似乎能够保持后续 pod 运行。

任何帮助表示赞赏。

- 编辑 -

以下是处理器在几分钟后关闭时的跟踪日志记录:

2020-11-20 06:32:09.721 DEBUG 1 --- [extShutdownHook] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@34f7cfd9, started on Fri Nov 20 06:26:59 GMT 2020
2020-11-20 06:32:09.722 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.integration.config.IdGeneratorConfigurer#0'
2020-11-20 06:32:09.722 DEBUG 1 --- [extShutdownHook] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@5e1dde44, started on Fri Nov 20 06:27:23 GMT 2020, parent: org.springframework.context.annotation.AnnotationConfigApplicationContext@34f7cfd9
2020-11-20 06:32:09.722 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2020-11-20 06:32:09.723 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Destroying singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@38830ea: defining beans [org.springframework.context.annotation.internalConfigurationAnnotationProcessor,org.springframework.context.annotation.internalAutowiredAnnotationProcessor,org.springframework.context.annotation.internalCommonAnnotationProcessor,org.springframework.context.event.internalEventListenerProcessor,org.springframework.context.event.internalEventListenerFactory,KStreamBinderConfiguration,org.springframework.boot.autoconfigure.internalCachingMetadataReaderFactory,org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration,org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration,org.springframework.boot.context.properties.ConfigurationPropertiesBindingPostProcessor,org.springframework.boot.context.internalConfigurationPropertiesBinderFactory,org.springframework.boot.context.internalConfigurationPropertiesBinder,org.springframework.boot.context.properties.BoundConfigurationProperties,org.springframework.boot.context.properties.ConfigurationBeanFactoryMetadata,org.springframework.cloud.stream.binder.kafka.streams.MultiBinderPropertiesConfiguration,provisioningProvider,kStreamBinder]; parent: org.springframework.beans.factory.support.DefaultListableBeanFactory@7cb502c
2020-11-20 06:32:09.723 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Retrieved dependent beans for bean 'org.springframework.boot.autoconfigure.internalCachingMetadataReaderFactory': [org.springframework.context.annotation.internalConfigurationAnnotationProcessor]
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'lifecycleProcessor'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'outputBindingLifecycle'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'inputBindingLifecycle'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'streamsBuilderFactoryManager'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean '_org.springframework.integration.errorLogger'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'integrationHeaderChannelRegistry'
2020-11-20 06:32:09.724 TRACE 1 --- [extShutdownHook] o.s.b.f.s.DefaultListableBeanFactory     : Returning cached instance of singleton bean 'stream-builder-normalize'
2020-11-20 06:32:09.724 DEBUG 1 --- [extShutdownHook] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483547
...

标签: javaspringapache-kafka-streamsspring-cloud-stream

解决方案


推荐阅读