avro - Spring Cloud Stream Kafka Consumer 应用程序不允许添加供应商
问题描述
我正在开发 Spring Cloud Stream Kafka 应用程序。我只添加了消费者来使用来自主题的消息并使用 FIX 协议将它们传递给第三方。
到目前为止它工作正常,但现在第三方发回了响应,我想将它们制作成一个新主题。当我在现有代码中添加供应商时,它开始表现得很奇怪。bootstrap.servers
配置从 remoteHost 代理更改为 localhost 并开始给出以下错误:
[AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9092) could not be established> Broker may not be available.
如果尝试连接本地主机,则会出现错误,因为没有任何 Kafka 设置。
下面是我的 application.yml 文件:
spring.cloud.stream.function.definition: amerData;emeaData;ackResponse #added new ackResponse here
spring.cloud.stream.kafka.streams:
binder:
brokers: remoteHost:9092
configuration:
schema.registry.url: remoteHost:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
ackResponse-out-0: #new addition
producer.configuration:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
spring.cloud.stream.bindings:
amerData-in-0:
destination: topic1
emeaData-in-0:
destination: topic2
ackResponse-out-0: #new addition
destination: topic3
并尝试了 Supplier -> Supplier<String> ackResponse()
or Supplier<Message<String>> ackResponse()
It only doesn't change remoteHost to localhost when I doing Supplier<KStream<String,String>> ackResponse()
,然后 bootstrap.servers 显示配置的远程选项,但这不正确,我无法编写收到的响应(主要是像这样的字符串或 json) 到 Kafka 主题。
我确实根据需要配置了我的消费者,Consumer<KStream<String, AVROPOJO1>> amerData()
他们Consumer<KStream<String, AVROPOJO2>> emeaData()
工作正常。
我错过或搞砸了什么?我们不能在同一个弹簧云流应用程序中同时拥有生产者/消费者吗?使用Streambridge
也无法解决这个问题。有人可以帮忙吗?
解决方案
如果您Supplier
像以前那样添加 bean,它会成为使用MessageChannel
基于 Kafka 绑定器的常规生产者。您需要在项目中添加常规的 Kafka 活页夹 ( spring-cloud-stream-binder-kafka
)。它的绑定应该在spring.cloud.stream.kafka.bindings
. 我看到你在上面定义了它spring.cloud.stream.kafka.streams.bindings
。我想知道这是不是问题?
推荐阅读
- javascript - React Slick Custom Carousel 与图像重叠 div
- glib - GLib-GIO-ERROR **:未安装设置架构“com.github.mfru.vala-todo”
- javascript - 如何使用循环创建 div。我在一本书中找到了这段代码,但它不起作用
- vue.js - 为什么 Vuex 的动作会返回一个承诺
? - javascript - Angular ngx-mat-datetime-picker 选择事件
- c# - Log4net 在记录几秒钟后将日志路径更改为安装文件夹。这是为什么?
- c++ - 将成员函数作为参数传递给不同标头中的父类
- android - 在自定义视图中处理变量 Drawable 的正确方法?
- python - 无法调用函数
- google-cloud-platform - GCP vpc相同的目的地,不同的路线