java - 嵌套 Java 类的 No Encoder found 错误
问题描述
我创建了一个 Scala 类,如下所示:
case class MyObjectWithEventTime(value: MyObject, eventTime: Timestamp)
MyObject 是一个 Java 对象。
我试图在我的 Spark Structured Streaming 作业中按如下方式使用它:
implicit val myObjectEncoder: Encoder[MyObject] = Encoders.bean(classOf[MyObject])
val withEventTime = mystream
.select(from_json(col("value").cast("string"), schema).alias("value"))
.withColumn("eventTime", to_timestamp(col("value.timeArrived")))
.as[MyObjectWithEventTime]
.groupByKey(row => {... some code here
})
.mapGroupsWithState(GroupStateTimeout.ProcessingTimeTimeout())(updateAcrossEvents)
.filter(col("id").isNotNull)
.toJSON
.writeStream
.format("kafka")
.option("checkpointLocation", "/tmp")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", conf.KafkaProperties.outputTopic)
.option("checkpointLocation", "/tmo/checkpointLocation")
.outputMode("update")
.start()
.awaitTermination()
但我不断收到这个错误......
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for com.xxx.MyObject
- field (class: "com.xxx.MyObject", name: "value")
- root class: "com.xxx.MyObjectWithEventTime"
解决方案
尝试为MyObjectWithEventTime
和使用Encoders.javaSerialization[T]
方法定义编码器:
implicit val myObjectEncoder: Encoder[MyObject] = Encoders.javaSerialization[MyObject]
implicit val myObjectWithEventEncoder: Encoder[MyObjectWithEventTime] = Encoders.javaSerialization[MyObjectWithEventTime]
请记住,您的 java 类MyObject
应该实现 Serializable 并为所有字段实现公共 getter 和 setter。
推荐阅读
- mysql - 数据库设计 SQL 和转换表
- php - 在 Magento 2.2.3 中构建自定义支付网关
- excel - 如何在 VB6 的文本框或收件箱中键入 +-*/ 来制作我的公式?
- c# - WPF - 更换功能区
- radare2 - 如何使用雷达获取地址的价值
- oracle11g - 没有归档日志的 RMAN 在线完整备份
- asp.net-mvc-4 - 使用 .NET Framework 4.5 在 ASP.NET MVC 4 中将 JSON 传递给 WebApi
- python - 如何在运行大量后端线程时管理 Tkinter 窗口响应性
- rust - 这个“原子”的 Rust 代码和它的“非原子”对应物有什么区别?
- python - 如何在shell脚本中执行python3程序