apache-kafka - 带有kafka的春云流
问题描述
在将 kafka 与 Spring Cloud Stream 集成时需要一些帮助。该应用程序非常简单,有 2 个部分(作为单独的 Java 进程运行)
- 消费者将请求放入 RequestTopic 并从 ResponseTopic 获得响应
- 生产者 - 从 RequestTopic 获取请求并将响应放回 ResponseTopic。
我为消费者创建了 RequestSenderChannel 和 ResponseReceiverChannel 接口,并为生产者应用程序创建了 RequestReceiverChannel 和 ResponseSenderChannel 接口。它们都共享同一个 yaml 文件。根据文档 spring.cloud.stream.bindings..destination 应该指定发送或接收消息的主题。但是当我运行应用程序时,应用程序在 kafka 中创建主题为“RequestSender”、“RequestReceiver”、“ResponseSender”和“ResponseReceiver”
我的假设是:由于 YAML 文件中的目标仅指定了两个主题“RequestTopic”和“ResponseTopic”,它应该创建了这些主题。但它会为 YAML 文件中“spring.cloud.stream.bindings”中指定的属性创建 Kafka 主题。有人可以指出配置/代码中的问题吗?
public interface RequestReceiverChannel
{
String requestReceiver ="RequestReceiver";
@Input(requestReceiver)
SubscribableChannel pathQueryRequest();
}
public interface RequestSenderChannel
{
String RequestSender ="RequestSender";
@Output(RequestSender)
MessageChannel pathQueryRequestSender();
}
public interface ResponseReceiverChannel
{
String ResponseReceiver = "ResponseReceiver";
@Input(ResponseReceiver)
SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
String ResponseSender = "ResponseSender";
@Output(ResponseSender)
MessageChannel pceResponseService();
}
'''
YAML 配置文件
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
RequestSender:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseSender:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
RequestReceiver:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseReceiver:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
kafka:
bindings:
RequestTopic:
consumer:
autoCommitOffset: false
ResponseTopic:
consumer:
autoCommitOffset: false
binder:
brokers: ${SERVICE_KAFKA_HOST:localhost}
zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}
解决方案
通过这样做spring.cloud.stream.bindings.<binding-name>.destination=foo
,您表示希望将<binding-name>
(例如RequestSender
)指定的绑定映射到名为 的代理目的地foo
。如果这样的目的地不存在,它将被自动配置。所以没有问题。
也就是说,我们刚刚发布了 Horsham.RELEASE(云 Hoxton.RELEASE 的一部分),我们正在远离您当前使用的基于注释的模型,转而采用更简单的功能模型。您可以在我们的发布博客中阅读有关它的更多信息,该博客还提供了 4 篇文章的链接,我们在其中详细阐述并提供了更多关于函数式编程范式的示例。
推荐阅读
- java - Java 模块层:从未命名的模块访问自定义 ModuleLayer 中的类
- angular - Angular 路由器接管 nginx 位置
- javascript - Angular 9 异步验证
- python - Python Bokeh 翻转颜色
- matlab - 如何使用 DSP 信号处理工具箱计算指数加权移动平均值
- hyperledger-fabric - 如何通过链码以动态方式授权访问 Hyperledger Fabric 上的私有集合?
- html - 浏览器不呈现简单的 html 图标
- javascript - 根据使用 Javascript/React 的功能路由不同的 url(已解决)
- reactjs - React Native 如何导航到屏幕并重置一些状态?
- android - TextInputLayout 提示重力底部