首页 > 解决方案 > Kafka Streams多个活页夹配置不起作用

问题描述

我试图在同一个应用程序中使用两个 Kafka Streams binder,但我做错了,因为我无法使用具有不同 applicationId 的第二个 binder 的配置。

应用程序.yml

第一个活页夹配置:

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.bindings.messagetoProcess-in-0.consumer.application-id: ID1
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.application-id: ID2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.minPartitionCount: 3
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.replicationFactor: 2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers: hub.corp:9092,hub.corp:9092,hub.corp:9092
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.acks: all
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.compression.type: gzip
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.max.request.size: 104857600
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.cache.max.bytes.buffering: 104857600
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 20000
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.windowstore.changelog.additional.retention.ms: 86400000
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee: exactly_once
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde


spring.cloud.stream.bindings.messagetoProcess-in-0.destination: test_messagetoProcess
spring.cloud.stream.bindings.messagetoProcess-in-0.group: test_consumer_messagetoProcess
spring.cloud.stream.bindings.messagetoProcess-in-0.binder: kafka1

spring.cloud.stream.bindings.messagetoProcess-out-0.destination: test_processedMessages
spring.cloud.stream.bindings.messagetoProcess-out-0.binder: kafka1

第二个活页夹配置:

spring.cloud.stream.binders.kafka2.type: kstream
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.bindings.richMessagetoProcess-in-0.consumer.application-id: ID3
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.application-id: ID4
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.minPartitionCount: 3
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.replicationFactor: 2
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers: hub.corp:9092,hub.corp:9092,hub.corp:9092
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.acks: all
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.compression.type: gzip
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.max.request.size: 104857600
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.cache.max.bytes.buffering: 104857600
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.windowstore.changelog.additional.retention.ms: 86400000
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee: exactly_once
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 2
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde


spring.cloud.stream.bindings.richMessagetoProcess-in-0.destination: test_richMessagetoProcess
spring.cloud.stream.bindings.richMessagetoProcess-in-0.group: test_consumer_richMessagetoProcess
spring.cloud.stream.bindings.richMessagetoProcess-in-0.binder: kafka2

spring.cloud.stream.bindings.richMessagetoProcess-out-0.destination: test_richProcessedMessages
spring.cloud.stream.bindings.richMessagetoProcess-out-0.binder: kafka2

另外我有两个 stateStores

aggregation.messagetoProcess.stateStoreName: test_messagetoProcess
aggregation.richMessagetoProcess.stateStoreName: test_RichmessagetoProcess

当我的应用程序在 Kafka Broker 中启动时,我会看到下一个更改日志和重新分区名称:

ID1-test_messagetoProcess-changelog
ID1-test_messagetoProcess-repartition

ID2-test_RichmessagetoProcess-changelog
ID2-test_RichmessagetoProcess-repartition

未使用 kafka2 binder 且applicationId ID3、ID4 不存在于日志中。对于这两个流,都应用了 binder 1 的配置……但我需要为每个 binder 进行不同的配置。

问题:

我有事吗?我怎样才能得到想要的行为?

此外,当我的流与多个实例(Openshift 中的 3 个 POD)进行聚合时,只有其中一个进行聚合,理论上使用这种配置,3 个 POD 必须同时工作。

谢谢!

标签: javaspring-bootapache-kafkaapache-kafka-streamsspring-cloud-stream

解决方案


推荐阅读