首页 > 解决方案 > 自定义 JSON 的 Serde 类(Kotlin 和 Kafka)

问题描述

我正在用 Kotlin 编写一个 kafka 流应用程序,它使用一条 JSON 消息(没有 AVRO 或 Schema Registry)。

在 MyMessage.kt 中,我已将MyMessage类声明为@Serializable.

我的消息.kt

import kotlinx.serialization.Serializable

@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)

流媒体.kt

val s: KString<String, MyMessage> = streamsBuilder()
    .stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)

运行此程序时,我在上面的行中收到以下错误:

Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID

我错过了什么?

标签: kotlinserializationapache-kafka-streams

解决方案


参数Serdes.serdesFrom()期望 aSerializerDeserializer对象(两个接口都是包中的 Kafka 接口,与注释org.apache.kafka.common.serialization无关。@Serializable

您需要创建类MyMessageSerializer extends Serializer并将MyMessageDeserialzer extends Deserializer这些对象传递给方法。要使用这两个类实现实际的序列化/反序列化,您可以根据需要依赖默认的序列化/反序列化。


推荐阅读