首页 > 解决方案 > 如何通过 YAML 中的 Spring Cloud Stream 提供 Kafka Streams 属性?

问题描述

我想搬到spring.kafka.streams.*下面spring.cloud.stream——这可能吗?我想过streams-properties类似于consumer-propertiesor producer-properties,但它不起作用。

spring:
  cloud:
    config:
      override-system-properties: false
      server:
        health:
          enabled: false
    stream:
      bindings:
        input_technischerplatz:
          destination: technischerplatz
        output_technischerplatz:
          destination: technischerplatz
      default:
        group: '${spring.application.name}'
        consumer:
          max-attempts: 5
      kafka:
        binder:
          auto-add-partitions: false
          auto-create-topics: false
          brokers: '${values.spring.kafka.bootstrap-servers}'
          configuration:
            header.mode: headers
          consumer-properties:
            allow.auto.create.topics: false
            auto.offset.reset: '${values.spring.kafka.consumer.auto-offset-reset}'
            enable.auto.commit: false
            isolation.level: read_committed
            max.poll.interval.ms: 300000
            max.poll.records: 100
            session.timeout.ms: 300000
          header-mapper-bean-name: defaultKafkaHeaderMapper
          producer-properties:
            acks: all
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            max.in.flight.requests.per.connection: 1
            max.block.ms: '${values.spring.kafka.producer.max-block-ms}'
            retries: 10
          required-acks: -1
  kafka:
    streams:
      applicationId: '${spring.application.name}_streams'
      properties:
        default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
        default.timestamp.extractor: org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp
        state.dir: '${values.spring.kafka.streams.properties.state.dir}'

标签: apache-kafkaapache-kafka-streamsspring-cloud-streamspring-cloud-stream-binder-kafka

解决方案


spring.cloud.stream您可以通过以下方式绑定流属性:

spring.cloud.stream.kafka.streams.binder.applicationId: my-application-id
spring.cloud.stream.kafka.streams.binder.configuration:
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde

更多详细信息,您可以参考文档:

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.M3/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_streams_binder


推荐阅读