首页 > 解决方案 > 如何将 Scala Maps 与 akka-kryo-serializer 一起使用

问题描述

我正在尝试使用库 akka-kryo-serializer。

我设法使它使用字符串作为测试,但是当我使用相同的代码处理 Map 时,仔细按照网站的说明,我一直遇到相同的错误:

错误1:我按照网站的说明写:

package entellect.spike.Kryo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}


object KryoSpike extends App {

  val kryo = new Kryo()
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeClassAndObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[Map[String,String]])

  println(testout.toString)

}

由于以下两行取自网站,此代码无法编译:

kryo.addDefaultSerializer(classOf[scala.collection.Map[ , ]], classOf[ScalaMapSerializer]) kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaMapSerializer])

我尝试遵循网站测试中的内容。对于测试地图,它使用:

kryo.setRegistrationRequired(true) kryo.addDefaultSerializer(classOf[scala.collection.Map[_, _]], classOf[ScalaImmutableMapSerializer]) kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap], 40)

以下行没有编译,因为编译没有找到“HashMap$HashTrieMap”

classOf[ScalaImmutableMapSerializer])
    kryo.register(classOf[scala.collection.immutable.HashMap$HashTrieMap],

40)

最后我的例子是这样的:

软件包 entellect.spike.Kryo

导入 java.io.{ByteArrayInputStream, ByteArrayOutputStream}

导入 com.esotericsoftware.kryo.Kryo 导入 com.esotericsoftware.kryo.io.{输入,输出} 导入 com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer

object KryoSpike extends App {

  val kryo = new Kryo()
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeClassAndObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[Map[String,String]])

  println(testout.toString)

}

但后来我收到以下错误:

线程“主”com.esotericsoftware.kryo.KryoException 中的异常:无法创建类(缺少无参数构造函数):com.esotericsoftware.kryo.Kryo$DefaultInstantiatorStrategy.newInstantiatorOf(Kryo.java: 1319) 在 com.esotericsoftware.kryo.Kryo.newInstantiator(Kryo.java:1127) 在 com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read (ScalaMapSerializers.scala:75) 在 com.romix.scala.serialization.kryo.ScalaImmutableMapSerializer.read(ScalaMapSerializers.scala:69) 在 com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:709) 在 entellect.spike。 Kryo.KryoSpike$.delayedEndpoint$entellect$spike$Kryo$KryoSpike$1(KryoSpike.scala:25) 在 entellect.spike.Kryo.KryoSpike$delayedInit$body。在 scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12) 在 scala.App $$anonfun$main$1.apply(App.scala:76) at scala.App$$anonfun$main$1.apply(App.scala:76) at scala.collection.immutable.List.foreach(List.scala:392 ) 在 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 在 scala.App$class.main(App.scala:76) 在 entellect.spike.Kryo.KryoSpike$.main(KryoSpike.scala :10) 在 entellect.spike.Kryo.KryoSpike.main(KryoSpike.scala)scala:76) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 在 scala.App$class.main(App .scala:76) 在 entellect.spike.Kryo.KryoSpike$.main(KryoSpike.scala:10) 在 entellect.spike.Kryo.KryoSpike.main(KryoSpike.scala)scala:76) 在 scala.collection.immutable.List.foreach(List.scala:392) 在 scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35) 在 scala.App$class.main(App .scala:76) 在 entellect.spike.Kryo.KryoSpike$.main(KryoSpike.scala:10) 在 entellect.spike.Kryo.KryoSpike.main(KryoSpike.scala)

编辑1:

我的依赖

  "org.apache.spark" % "spark-core_2.11" % "2.3.1",
  "org.apache.spark" % "spark-sql_2.11" % "2.3.1",
  "com.typesafe.akka" %% "akka-stream" % "2.5.16",
  "com.typesafe.akka" %% "akka-http-spray-json" % "10.1.4",
  "com.typesafe.akka" %% "akka-stream-kafka" % "0.22",
  "com.github.romix.akka" %% "akka-kryo-serialization" % "0.5.0"

请注意,我没有使用 Kryo 特定的 Akka 功能,而是将其用作通用序列化框架。火花也一样。没有直接插入 spark 或 akka 配置。

标签: scalaakkaakka-streamkryo

解决方案


解决方案使用

ScalaImmutableAbstractMapSerializer 与 Map

和 writeObject && readObject 方法一起使用。

package entellect.spike.Kryo

import java.io.{ByteArrayInputStream, ByteArrayOutputStream}

import com.esotericsoftware.kryo.Kryo
import com.esotericsoftware.kryo.io.{Input, Output}
import com.romix.scala.serialization.kryo._

object KryoSpike extends App {


  val kryo = new Kryo()
  kryo.setRegistrationRequired(false)
  kryo.addDefaultSerializer(classOf[scala.collection.Map[_,_]], classOf[ScalaImmutableAbstractMapSerializer])
  kryo.addDefaultSerializer(classOf[scala.collection.generic.MapFactory[scala.collection.Map]], classOf[ScalaImmutableAbstractMapSerializer])

  val testin = Map("id" -> "objID", "field1" -> "field1Value")

  val outStream = new ByteArrayOutputStream()
  val output = new Output(outStream, 4096)
  kryo.writeObject(output, testin)
  output.flush()


  val input = new Input(new ByteArrayInputStream(outStream.toByteArray), 4096)
  val testout = kryo.readObject(input, classOf[scala.collection.Map[_,_]])

  println(testout.toString)

}

推荐阅读