scala - Kryo 序列化问题 Spark
问题描述
我正在尝试在 spark 代码中为 Kryo 序列化注册下面的类,但我收到一个错误。
代码:
class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
lazy val producer = createProducer()
def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}
object KafkaSink {
def apply(config: Properties): KafkaSink = {
val f = () => {
val producer = new KafkaProducer[String, String](config)
//close producer if VM exits
sys.addShutdownHook {
producer.close()
}
producer
}
new KafkaSink(f)
}
}
错误:
引起:java.lang.IllegalArgumentException:类未注册:com.test.KafkaSink$$anonfun$1 注意:要注册此类,请使用:kryo.register(com.test.KafkaSink$$anonfun$1.class); 在 com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488) 在 com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97) 在 com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java :517) 在 com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:76) ... 14 更多
我尝试使用以下两种方法注册课程,但这些方法不起作用并给了我同样的错误
kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)
我怎样才能注册这个课程?
解决方案
推荐阅读
- macos - Mac brew arangodb 延迟启动日志文件路径
- talend - 在 tRestClient Talend 中传递参数
- firebase - 带有 Firebase 云消息传递的 Vue pwa 无法正常工作
- python - 除了块未捕获的云 Firestore 异常
- javascript - 在选择日期库中覆盖原型函数
- excel - 使用 web api 下载 zip 文件会导致文件损坏
- mysql - Mysql:如何获取从去年 7 月到当前日期的数据
- python - 如何在 Python 3 中减去嵌套列表中的元素?
- android - 在 WhatsApp 上分享链接时包含图片
- javascript - 使用 js 和 html 存储预定义变量并在文本框中检索