scala - avro4s 无法反序列化 AnyRef
问题描述
我有一个简单的案例类
case class KafkaContainer(key: String, payload: AnyRef)
然后我想通过制作人将此发送到kafka主题我这样做
val byteArrayStream = new ByteArrayOutputStream()
val output = AvroOutputStream.binary[KafkaContainer](byteArrayStream)
output.write(msg)
output.close()
val bytes = byteArrayStream.toByteArray
producer.send(new ProducerRecord("my_topic", msg.key, bytes))
这运作良好
然后我尝试消费这个
Consumer.committableSource(consumerSettings, Subscriptions.topics("my_topic"))
.map { msg =>
val in: ByteArrayInputStream = new ByteArrayInputStream(msg.record.value())
val input: AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binary[KafkaContainer](in)
val result: Option[KafkaContainer] = input.iterator.toSeq.headOption
input.close()
...
}.runWith(Sink.ignore)
这适用于有效载荷中的任何类。
但!如果是任何参考。消费者代码失败
错误:(38, 96) 找不到类型为 com.sksamuel.avro4s.FromRecord[test.messages.KafkaContainer] val 输入的证据参数的隐式值:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
错误:(38, 96) 二进制方法的参数不足:(隐含证据 $21:com.sksamuel.avro4s.SchemaFor[test.messages.KafkaContainer],隐含证据 $22:com.sksamuel.avro4s.FromRecord[test.messages. KafkaContainer])com.sksamuel.avro4s.AvroBinaryInputStream[test.messages.KafkaContainer]。未指定值参数证据 $22。val 输入:AvroBinaryInputStream[KafkaContainer] = AvroInputStream.binaryKafkaContainer
如果我声明隐含
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
它无法编译
错误:(58, 71) 找不到类型为 com.sksamuel.avro4s.FromValue[Object] 隐式值的惰性隐式值 fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
错误:(58, 71) 方法lazyConverter 的参数不足:(隐式 fromValue: shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]])shapeless.Lazy[com.sksamuel.avro4s.FromValue[Object]]。未指定值参数 fromValue。隐式验证 fromRecord:FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
如果添加每个隐含的编译器是必需的
lazy implicit val fromValue: FromValue[Object] = FromValue[Object]
implicit val fromRecordObject: FromRecord[Object] = FromRecord[Object]
implicit val schemaFor: SchemaFor[KafkaContainer] = SchemaFor[KafkaContainer]
implicit val fromRecord: FromRecord[KafkaContainer] = FromRecord[KafkaContainer]
编译失败并出现错误
宏扩展期间出现错误:(58, 69) 异常:java.lang.IllegalArgumentException:要求失败:需要一个案例类,但 Object 不在 com.sksamuel.avro4s 的 scala.Predef$.require(Predef.scala:277) 处。 FromRecord$.applyImpl(FromRecord.scala:283) 隐式验证 fromRecordObject: FromRecord[Object] = FromRecord[Object]
但是如果我为某个类替换 AnyRef - 不需要隐式,一切都会再次正常
解决方案
我在使用 Any 数据类型时遇到了类似的问题。您必须指定此成员变量的有效类型,因为 Any 或 AnyRef 可以是任何类型。然后使用 Either 或 shapeless(另见Github 文档)。就我而言,它可以是 String、Long、Double 或 null,所以使用 shapeless 你可以:
case class DataContainer(name: String, value: Option[String:+:Long:+:Double:+:CNil])
这将转换为 AVRO 中的联合类型:
{
"name" : "value",
"type" : [ "null", "string", "long", "double" ]
}
推荐阅读
- android - 在 SharePreferences 中保存 API 调用响应
- java - Kafka 流式添加全局存储的用例
- go - 例程、通道和无限循环
- laravel - 当我返回一个带有变形关系的图像的产品时,它会返回所有产品
- java - 删除 cookie 时违反 SonarQube
- xml - 如何在powershell中从xml获取特定字符串
- android - 如何防止使用 IL2CPP 进行代码剥离?
- java - java和Kotlin混合项目中的依赖解析
- docker - 创世区块创建失败(Hyperledger Fabric)
- reactjs - 如何将 React 应用程序集成到 Wordpress(反应路由器)中?