java - Kafka Streams多个活页夹配置不起作用
问题描述
我试图在同一个应用程序中使用两个 Kafka Streams binder,但我做错了,因为我无法使用具有不同 applicationId 的第二个 binder 的配置。
应用程序.yml
第一个活页夹配置:
spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.bindings.messagetoProcess-in-0.consumer.application-id: ID1
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.application-id: ID2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.minPartitionCount: 3
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.replicationFactor: 2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers: hub.corp:9092,hub.corp:9092,hub.corp:9092
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.acks: all
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.compression.type: gzip
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.max.request.size: 104857600
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.cache.max.bytes.buffering: 104857600
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 20000
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.windowstore.changelog.additional.retention.ms: 86400000
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee: exactly_once
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 2
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.messagetoProcess-in-0.destination: test_messagetoProcess
spring.cloud.stream.bindings.messagetoProcess-in-0.group: test_consumer_messagetoProcess
spring.cloud.stream.bindings.messagetoProcess-in-0.binder: kafka1
spring.cloud.stream.bindings.messagetoProcess-out-0.destination: test_processedMessages
spring.cloud.stream.bindings.messagetoProcess-out-0.binder: kafka1
第二个活页夹配置:
spring.cloud.stream.binders.kafka2.type: kstream
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.bindings.richMessagetoProcess-in-0.consumer.application-id: ID3
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.application-id: ID4
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.autoAddPartitions: true
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.minPartitionCount: 3
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.replicationFactor: 2
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.serdeError: logAndContinue
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers: hub.corp:9092,hub.corp:9092,hub.corp:9092
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.acks: all
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.compression.type: gzip
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.max.request.size: 104857600
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.cache.max.bytes.buffering: 104857600
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 1000
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.windowstore.changelog.additional.retention.ms: 86400000
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.processing.guarantee: exactly_once
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.replication.factor: 2
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.richMessagetoProcess-in-0.destination: test_richMessagetoProcess
spring.cloud.stream.bindings.richMessagetoProcess-in-0.group: test_consumer_richMessagetoProcess
spring.cloud.stream.bindings.richMessagetoProcess-in-0.binder: kafka2
spring.cloud.stream.bindings.richMessagetoProcess-out-0.destination: test_richProcessedMessages
spring.cloud.stream.bindings.richMessagetoProcess-out-0.binder: kafka2
另外我有两个 stateStores
aggregation.messagetoProcess.stateStoreName: test_messagetoProcess
aggregation.richMessagetoProcess.stateStoreName: test_RichmessagetoProcess
当我的应用程序在 Kafka Broker 中启动时,我会看到下一个更改日志和重新分区名称:
ID1-test_messagetoProcess-changelog
ID1-test_messagetoProcess-repartition
ID2-test_RichmessagetoProcess-changelog
ID2-test_RichmessagetoProcess-repartition
未使用 kafka2 binder 且applicationId ID3、ID4 不存在于日志中。对于这两个流,都应用了 binder 1 的配置……但我需要为每个 binder 进行不同的配置。
问题:
我有事吗?我怎样才能得到想要的行为?
此外,当我的流与多个实例(Openshift 中的 3 个 POD)进行聚合时,只有其中一个进行聚合,理论上使用这种配置,3 个 POD 必须同时工作。
谢谢!
解决方案
推荐阅读
- javascript - TypeError:无法读取 Netlify CMS/React.js 上未定义的属性“地图”
- drupal-7 - cURL 错误 60:SSL:没有替代证书主题名称与目标主机名匹配。项目间沟通
- angular - 如何为 Angular Material Slide Toggle 设置默认值?
- python-3.x - 在 argparse 中,如何使位置参数取决于可选参数?
- python - 是否可以使用 Get 和 Set 操作 matplotlib 直方图中的数据?
- python - 在python3中格式化正则表达式输出
- javascript - 如何选择除使用 Vanilla Javascript 悬停的元素之外的所有元素?
- html - jQuery html() 函数不会使文本变为粗体
- fluentd - Fluentd如何通过执行脚本从文件中获取源代码
- oracle-apex - 配置oracle apex + ords + tomcat 9时出现问题