kotlin - 如何制作 Kryo 串行器
问题描述
我试图通过提供 Kryo Serializer 来解决以下问题,但它仍然不起作用。它无法识别 ModelCom 的序列化程序。此外,打印功能的任何消息都不会显示。
我使用了 Apache Flink 1.9.0 和 Apache Jena 3.10.0
我在 Kotlin 中的代码:
val serializer = object : Serializer<Model>(){
override fun write(kryo: Kryo, output: Output?, obj : Model?) {
print("write")
kryo.writeClassAndObject(output, obj)
}
override fun read(kryo: Kryo, input: Input?, type: Class<Model>?): Model {
print("read")
val m = kryo.readObject(input, Model::class.java)
return m
}
}
ExecutionContext.see.config.registerTypeWithKryoSerializer(ModelCom::class.java, serializer::class.java)
错误
Exception in thread "main" org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot serialize operator object class org.apache.flink.streaming.api.operators.SimpleUdfStreamOperatorFactory.
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:222)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexConfig(StreamingJobGraphGenerator.java:460)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:272)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:243)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:207)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:159)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:94)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:737)
at org.apache.flink.optimizer.plan.StreamingPlan.getJobGraph(StreamingPlan.java:40)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:86)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489)
at core.EgressEngine.start(EgressEngine.kt:187)
at core.EgressEngineKt.main(EgressEngine.kt:45)
Caused by: java.io.NotSerializableException: org.apache.jena.rdf.model.impl.ModelCom
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)
at java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)
at java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)
at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)
at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:586)
at org.apache.flink.util.InstantiationUtil.writeObjectToConfig(InstantiationUtil.java:515)
at org.apache.flink.streaming.api.graph.StreamConfig.setStreamOperatorFactory(StreamConfig.java:219)
... 14 more
解决方案
Jena 模型不可序列化,因此这种方法行不通。相反,您可以做的是发送足够的序列化数据,以便每个需要模型的实例都可以实例化一个。
请参阅jena-users 列表中的此线程,了解如何为 Spark 解决此问题;任何基于 JVM 的分布式计算框架的基本问题都是相同的。
推荐阅读
- angular - 角度行为主题和订阅组件
- angular - UiRouter 在路由更改之前滚动到上一页位置
- java - 如何在 Spring Data JDBC 中将实体映射到表?
- python-3.x - 在熊猫中将数组对象更改为字符串
- swift - 在 Firebase 中未调用 childChanged
- python - 如何绘制二次曲线?
- entity-framework - 实体框架 - 以增量插入多个
- android - 在活动开始时将片段加载到 FrameLayout
- scala - Flink Metrics 名称冲突
- google-maps - GoogleMaps-API:不要开车经过某些街道或地区