首页 > 解决方案 > 配置 Spring Cloud Task 使用 Spring Cloud Data Flow server 的 Kafa

问题描述

我有一个作为消息代理Spring Cloud Data Flow (SCDF)运行的服务器Kubernetes clusterKafka现在我正在尝试启动一个Spring Cloud Task (SCT)写入Kafka. 我希望 SCT 使用相同KafkaSCDF。这带来了我有两个问题,希望他们能得到回答:

  1. 如何配置 SCT 以使用与 SCDF 相同的 Kafka?
  2. 是否可以配置 SCT,以便在启动时自动将 Kafka 服务器 uri 传递给 SCT,类似于在启动时传递给 SCT 的数据源属性?

由于我找不到任何有关如何实现此目的的示例,因此非常感谢您的帮助。

编辑:我自己的答案

这就是我如何让它适用于我的情况。我的 SCT 需要spring.kafka.bootstrap-servers提供。在 SCDF 的 shell 中,我将它作为参数提供--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT},其中KAFKA_SERVICE_HOSTKAFKA_SERVICE_PORT是由 SCDF 的 k8s 设置脚本创建的环境变量。

这是在 SCDF 的 shell 中启动任务的方法

dataflow:>task launch --name sample-task --arguments "--spring.kafka.bootstrap-servers=${KAFKA_SERVICE_HOST}:${KAFKA_SERVICE_PORT}"

标签: spring-cloud-dataflowspring-cloud-task

解决方案


您可能需要查看Spring Cloud Task Events参考指南中的部分。

期望您选择选择的活页夹并将该库打包到 Task 应用程序的类路径中。使用该依赖项,您将使用 Spring Cloud Stream 的Kafka binder 属性配置应用程序,例如与spring.cloud.stream.kafka.binder.brokers连接到现有 Kafka 集群相关的其他属性。

使用这些配置启动任务应用程序(从 SCDF)后,您将能够在任务应用程序中发布或接收事件。

或者,使用 Task 应用程序的类路径中的 Kafka-binder,您可以通过全局配置为 SCDF 启动的所有 Task 定义 Kafka binder 属性。见Common Application Properties参考文献。指南以获取更多信息。在此模型中,您不必为每个任务应用程序显式配置 Kafka 属性,而是 SCDF 会在启动任务时自动传播它们。请记住,这些属性将提供给所有任务启动。


推荐阅读