首页 > 解决方案 > Kryo 序列化问题 Spark

问题描述

我正在尝试在 spark 代码中为 Kryo 序列化注册下面的类,但我收到一个错误。

代码:

class KafkaSink(createProducer: () => KafkaProducer[String, String]) extends Serializable {
  lazy val producer = createProducer()
  def send(topic: String, key: String, value: String): Unit = producer.send(new ProducerRecord(topic, key, value))
}

object KafkaSink {
  def apply(config: Properties): KafkaSink = {
    val f = () => {
      val producer = new KafkaProducer[String, String](config)

      //close producer if VM exits
      sys.addShutdownHook {
        producer.close()
      }
      producer
    }
    new KafkaSink(f)
  }
}

错误:

引起:java.lang.IllegalArgumentException:类未注册:com.test.KafkaSink$$anonfun$1 注意:要注册此类,请使用:kryo.register(com.test.KafkaSink$$anonfun$1.class); 在 com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:488) 在 com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:97) 在 com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java :517) 在 com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:76) ... 14 更多

我尝试使用以下两种方法注册课程,但这些方法不起作用并给了我同样的错误

kryo.register(classOf[KafkaSink])
kryo.register(KafkaSink.getClass)

我怎样才能注册这个课程?

标签: scalaapache-sparkapache-kafkakryo

解决方案


推荐阅读