首页 > 解决方案 > spring.cloud.stream.kafka.bindings。.producer.configuration 未应用

问题描述

我有一个用于流数据处理的小型应用程序。我从一个主题接收数据,处理并写入另一个主题。我将 spring cloud 和 apache kafka 用于我的 java 应用程序。我想为生产者设置一些属性,比如 bufferSize。这是我的 application.yaml 的示例

spring:
  cloud:
    stream:
      bindings:
        point-output-channel:
          destination: DST4-topic
        point-input-channel:
          destination: SRC4-topic
      kafka:
        streams:
          bindings:
            point-output-channel:
              producer:
                bufferSize: 14000
          binder:
            brokers:  localhost:9092
            configuration:
              commit.interval.ms: 10000
              state.dir: state-store
              default:
                key:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value:
                  serde: org.apache.kafka.common.serialization.Serdes$StringSerde

但是 bufferSize 值并没有应用于生产者,这可能是什么问题?谢谢你。

标签: javaapache-kafkaspring-cloudapache-kafka-streamsspring-kafka

解决方案


文档来看,您的配置似乎有点错误。spring.cloud.stream.kafka.bindings.<channelName>.producerKafka 绑定属性不应该spring.cloud.stream.kafka.streams.bindings.<channelName>.producer

试试这个:

spring:
  cloud:
    stream:
      bindings:
        point-output-channel:
          destination: DST4-topic
        point-input-channel:
          destination: SRC4-topic
      kafka:
        bindings:
          point-output-channel:
            producer:
              bufferSize: 14000
        binder:
          brokers: localhost:9092
          configuration:
            commit.interval.ms: 10000
            state.dir: state-store
            default:
              key:
                serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              value:
                serde: org.apache.kafka.common.serialization.Serdes$StringSerde

推荐阅读