首页 > 解决方案 > 带有kafka的春云流

问题描述

在将 kafka 与 Spring Cloud Stream 集成时需要一些帮助。该应用程序非常简单,有 2 个部分(作为单独的 Java 进程运行)

  1. 消费者将请求放入 RequestTopic 并从 ResponseTopic 获得响应
  2. 生产者 - 从 RequestTopic 获取请求并将响应放回 ResponseTopic。

我为消费者创建了 RequestSenderChannel 和 ResponseReceiverChannel 接口,并为生产者应用程序创建了 RequestReceiverChannel 和 ResponseSenderChannel 接口。它们都共享同一个 yaml 文件。根据文档 spring.cloud.stream.bindings..destination 应该指定发送或接收消息的主题。但是当我运行应用程序时,应用程序在 kafka 中创建主题为“RequestSender”、“RequestReceiver”、“ResponseSender”和“ResponseReceiver”

我的假设是:由于 YAML 文件中的目标仅指定了两个主题“RequestTopic”和“ResponseTopic”,它应该创建了这些主题。但它会为 YAML 文件中“spring.cloud.stream.bindings”中指定的属性创建 Kafka 主题。有人可以指出配置/代码中的问题吗?

public interface RequestReceiverChannel
{
    String requestReceiver ="RequestReceiver";
    @Input(requestReceiver)
    SubscribableChannel pathQueryRequest();
}

public interface RequestSenderChannel
{
    String RequestSender ="RequestSender";
    @Output(RequestSender)
    MessageChannel pathQueryRequestSender();
}

public interface ResponseReceiverChannel
{
    String ResponseReceiver = "ResponseReceiver";
    @Input(ResponseReceiver)
    SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
    String ResponseSender = "ResponseSender";
    @Output(ResponseSender)
    MessageChannel pceResponseService();
}
'''

YAML 配置文件

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        RequestSender:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseSender:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
        RequestReceiver:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseReceiver:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
      kafka:
        bindings:
          RequestTopic:
            consumer:
              autoCommitOffset: false
          ResponseTopic:
            consumer:
              autoCommitOffset: false
        binder:
          brokers: ${SERVICE_KAFKA_HOST:localhost}
          zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
          defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
          defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}

标签: apache-kafkaspring-cloud-stream

解决方案


通过这样做spring.cloud.stream.bindings.<binding-name>.destination=foo,您表示希望将<binding-name>(例如RequestSender)指定的绑定映射到名为 的代理目的地foo。如果这样的目的地不存在,它将被自动配置。所以没有问题。

也就是说,我们刚刚发布了 Horsham.RELEASE(云 Hoxton.RELEASE 的一部分),我们正在远离您当前使用的基于注释的模型,转而采用更简单的功能模型。您可以在我们的发布博客中阅读有关它的更多信息,该博客还提供了 4 篇文章的链接,我们在其中详细阐述并提供了更多关于函数式编程范式的示例。


推荐阅读