首页 > 解决方案 > Spring Cloud Stream Kafka Consumer 应用程序不允许添加供应商

问题描述

我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来使用来自主题的消息并使用 FIX 协议将它们传递给第三方。

到目前为止它工作正常,但现在第三方发回了响应,我想将它们制作成一个新主题。当我在现有代码中添加供应商时,它开始表现得很奇怪。bootstrap.servers配置从 remoteHost 代理更改为 localhost 并开始给出以下错误:

[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.

如果尝试连接本地主机,则会出现错误,因为没有任何 Kafka 设置。

下面是我的 application.yml 文件:

spring.cloud.stream.function.definition: amerData;emeaData;ackResponse  #added new ackResponse here
spring.cloud.stream.kafka.streams:
  binder:
    brokers: remoteHost:9092
    configuration:
      schema.registry.url: remoteHost:8081
      default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
  bindings:
    ackResponse-out-0:           #new addition
      producer.configuration:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer

spring.cloud.stream.bindings:
  amerData-in-0:
    destination: topic1
  emeaData-in-0:
    destination: topic2
  ackResponse-out-0:       #new addition
    destination: topic3

并尝试了 Supplier -> Supplier<String> ackResponse()or Supplier<Message<String>> ackResponse() It only doesn't change remoteHost to localhost when I doing Supplier<KStream<String,String>> ackResponse(),然后 bootstrap.servers 显示配置的远程选项,但这不正确,我无法编写收到的响应(主要是像这样的字符串或 json) 到 Kafka 主题。

我确实根据需要配置了我的消费者,Consumer<KStream<String, AVROPOJO1>> amerData()他们Consumer<KStream<String, AVROPOJO2>> emeaData()工作正常。

我错过或搞砸了什么?我们不能在同一个弹簧云流应用程序中同时拥有生产者/消费者吗?使用Streambridge也无法解决这个问题。有人可以帮忙吗?

标签: avroapache-kafka-streamsspring-cloud-streamkafka-producer-apiconfluent-schema-registry

解决方案


如果您Supplier像以前那样添加 bean,它会成为使用MessageChannel基于 Kafka 绑定器的常规生产者。您需要在项目中添加常规的 Kafka 活页夹 ( spring-cloud-stream-binder-kafka)。它的绑定应该在spring.cloud.stream.kafka.bindings. 我看到你在上面定义了它spring.cloud.stream.kafka.streams.bindings。我想知道这是不是问题?


推荐阅读