首页 > 解决方案 > 如何从 SubscriableChannel 构建 KStream

问题描述

我正在使用 Spring 云流,并想稍微摆弄一下 KStreams/KTables。

我正在寻找从标准 Kafka 主题转换为流的方法。

我已经在 KSQL 中完成了这项工作,但我试图弄清楚是否有办法让 SpringBoot 处理这个问题。我能找到的最好的例子是@Input@Output渠道都已经存在的例子,KStreams但我认为这不是我想要的。

卡夫卡设置

在 SpringBoot 内部,我正在执行以下操作:

从那里我希望得到它的输出并在现场构建 aKStreamKTablekeyed platformUID

输入数据

所以我正在使用的数据是:

{
  "platformUID": "UID",
  "type": "TLPS",
  "state": "PLATFORM_INITIALIZED",
  "fuelremaining": 5.9722E+24,
  "latitude": 39,
  "longitude": -115,
  "altitude": 0,
  "time": "2018-07-18T00:00:00Z[UTC]"
}

KSQL

我可以运行这些 KSQL 命令来创建我需要的东西。(这里我以字符串的形式读取时间,而不是我在 java/kotlin 实现中执行的实际时间)

CREATE STREAM force_no_key (
    platformUID string, 
    type string, 
    state string, 
    fuelremaining DOUBLE, 
    latitude DOUBLE, 
    longitude DOUBLE, 
    altitude DOUBLE
  ) with (
    kafka_topic='force-entities-topic', 
    value_format='json');

从那里我制作另一个流(因为我无法让它正确读取密钥)

CREATE STREAM force_with_key 
  WITH (KAFKA_TOPIC='blue_force_with_key') AS
  select PLATFORMUID as UID, LATITUDE as lat, LONGITUDE as LON, ALTITUDE as ALT, state, type 
  FROM force_no_key 
  PARTITION BY UID;

而从这一点

CREATE TABLE FORCE_TABLE
( UID VARCHAR, 
    LAT DOUBLE, 
    LON DOUBLE,
    ALT DOUBLE
) WITH (KAFKA_TOPIC = 'force_with_key',
        VALUE_FORMAT='JSON',
        KEY = 'UID');

爪哇风格!

我认为我遇到麻烦的地方就在这里。我在这里定义我的绑定接口:



    interface ForceStreams {

        companion object {
            // From the settings file we configure it with the value of-force-in
            const val DIRTY_INPUT = "dirty-force-in"
            const val CLEANED_OUTPUT = "clean-force-out"
            const val CLEANED_INPUT = "clean-force-in"
            const val STREAM_OUT = "stream-out"
        }

        @Input(DIRTY_INPUT)
        fun initialInput(): MessageChannel

        @Output(CLEANED_OUTPUT)
        fun cleanOutput(): SubscribableChannel

        @Input(CLEANED_INPUT)
        fun cleanInput(): MessageChannel

        @Output(STREAM_OUT)
        fun cleanedBlueForceMessage(): KStream<String, ForceEntity>

        @Output(TABLE_OUT)
        fun tableOutput(): KTable<String, ForceEntity>
    }

然后我用这个块进行清洁:

@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {

    var inputMap: Map<String, Any> = objectMapper.readValue(message)

    var map = inputMap.toMutableMap()

    map["type"] = map["type"].toString().replace("-", "_")
    map["time"] = map["time"].toString().replace("[UTC]", "")

    val json = objectMapper.writeValueAsString(map)

    val fe : ForceEntity = objectMapper.readValue(json, ForceEntity::class.java)

    return fe
}

但我要从MessageChannelSubscribableChannel

我不确定该怎么做是从SubscribableChannelKStream<String,ForceEntity>或者KTable<String,ForceEntity>

任何帮助将不胜感激 - 谢谢

编辑 - applicaiton.yml

server:
  port: 8888
spring:
  application:
    name: Blue-Force-Table
  kafka:
    bootstrap-servers: # This seems to be for the KStreams the other config is for normal streams
      - localhost:19092
  cloud:
    stream:
      defaultBinder: kafka
      kafka:
        binder:
          brokers: localhost:19092
      bindings:
        dirty-force-in:
          destination: force-entities-topic
          contentType: application/json
        clean-force-in:
          destination: force-entities-topic-clean
          contentType: application/json
        clean-force-out:
          destination: force-entities-topic-clean
          contentType: application/json
        stream-out:
          destination: force_stream
          contentType: application/json
        table-out:
          destination: force_table
          contentType: application/json

我想接下来的问题是 - 这甚至可能吗?您可以在单个函数中混合和匹配活页夹吗?

标签: javaspring-bootkotlinapache-kafka-streamsspring-cloud-stream

解决方案


首先StreamListener,您通过绑定接收数据DIRTY_INPUT并通过绑定写入数据CLEANED_OUTPUT。然后你需要有另一个StreamListener,在那里你接收该数据KStream并进行处理并写入输出。

第一处理器:

@StreamListener(ForceStreams.DIRTY_INPUT)
@SendTo(ForceStreams.CLEANED_OUTPUT)
fun forceTimeCleaner(@Payload message: String): ForceEntity {
....

将以下内容更改为KStream绑定。

@Input(CLEANED_INPUT)
        fun cleanInput(): MessageChannel

@Input(CLEANED_INPUT)
        fun cleanInput(): KStream<String, ForceEntity>

第二处理器:

        @StreamListener(CLEANED_INPUT)
        @SendTo(STREAM_OUT)
        public KStream<String, ForceEntity> process(
                KStream<String, ForceEntity> forceEntityStream) {

            return forceEntityStream
                    ........
                    .toStream();
        }

目前,Spring Cloud Stream 中的 Kafka Streams binder 不支持将数据写为KTable. 输出上只KStream允许使用对象(KTable输入上允许绑定)。如果这是一个硬性要求,您需要研究 Spring Kafka,您可以在其中降低级别并执行此类出站操作。

希望有帮助。


推荐阅读