kotlin - 自定义 JSON 的 Serde 类(Kotlin 和 Kafka)
问题描述
我正在用 Kotlin 编写一个 kafka 流应用程序,它使用一条 JSON 消息(没有 AVRO 或 Schema Registry)。
在 MyMessage.kt 中,我已将MyMessage
类声明为@Serializable
.
我的消息.kt
import kotlinx.serialization.Serializable
@Serializable
data class MyMessage(val from: String, val to: String, val msg: String)
流媒体.kt
val s: KString<String, MyMessage> = streamsBuilder()
.stream("my", Consumed.with(Serdes.String, Serdes.serdeFrom(MyMessage::class.java)
运行此程序时,我在上面的行中收到以下错误:
Exception in thread "main" java.lang.IllegalArgumentException: Unknown class for built-in serializer. Supported types are: String, Short, Integer, Long, Float, Double, ByteArray, ByteBuffer, Bytes, UUID
我错过了什么?
解决方案
参数Serdes.serdesFrom()
期望 aSerializer
和Deserializer
对象(两个接口都是包中的 Kafka 接口,与注释org.apache.kafka.common.serialization
无关。@Serializable
您需要创建类MyMessageSerializer extends Serializer
并将MyMessageDeserialzer extends Deserializer
这些对象传递给方法。要使用这两个类实现实际的序列化/反序列化,您可以根据需要依赖默认的序列化/反序列化。
推荐阅读
- python - 在 Python 中使用导入模块编写自己的函数的最佳实践?
- playframework - 在 Play 2.6 上使用 stb-native-packager 和 Debian 和 SystemV
- java - 在表单中间显示 JInternalFrame 数据
- python - 如何禁用 PytestDeprecationWarning:直接构建 Flake8Item 已被弃用,请使用 Flake8Item.from_parent
- reactjs - 有什么改进设计的想法吗?
- javascript - Angular 和 FireBase 问题
- c# - 如何将记录添加到数据网格并从另一个窗口更新?
- xamarin - 在 Xamarin 表单中对源图像使用 https 连接
- vue.js - 将元素插入模板但没有 v-model
- node.js - 如何在没有这些代码重复错误的情况下将 Knockout.js 与 TypeScript 一起使用?