scala - Jackson 创建的 Java Iterator 问题破坏了 Scala Flink 应用程序
问题描述
有一个 Scala Flink 应用程序,我在其中使用 Jackson 库解析 JSON。解析由自定义方法处理,它使用惰性启动概念来保持快速。
现在,无论出于何种原因,在 Flink 管道中进一步传递具有惰性值的模型会导致一些奇怪的错误,因为util.Iterator
这是读取 JSON 的主干。我怀疑这个问题实际上可能来自,Kryo
但我不知道如何确认。值得注意的是,.toList
在同一个(flink)中急切地初始化模型(用 )map
解决了这个问题。但事实并非如此,我想进一步传递我的懒惰模型。
最后,我提供了一个带有演示代码的存储库,但我还想在 StackOverflow 中提供所有详细信息。
示例模型和解析定义:
case class Root(items: Collection[Data])
case class Data(data: Collection[Double])
def toRoot(node: JsonNode): Root = {
val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
val items: Collection[Data] = data.asScala.map(x => toData(x))
Root(items)
}
JSON数据类似于:
{
"items": [
{
"data": [
11.71476355252127,
48.342882259940176,
507.3,
11.714791605037252,
...
并在一项工作中完成所有map
工作:
env.fromCollection(Seq(input))
.map(i => flatten(read(i)))
.print()
但进一步传递失败:
env.fromCollection(Seq(input))
.map(i => read(i))
.map(i => flatten(i))
.print()
有错误:
- 斯卡拉 2.11
Caused by: java.util.ConcurrentModificationException
at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
at java.util.ArrayList$Itr.next(ArrayList.java:861)
at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
- 斯卡拉 2.12
Caused by: java.lang.NullPointerException
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
... 29 more
我创建了一个演示项目,所有这些示例都准备好使用 Scala 2.11 和 2.12 进行测试,因为它实际上提供了不同的结果可用这里
解决方案
推荐阅读
- database - Oracle 数据库返回日期为 1951 而不是 2051
- python - 在Python中使用循环根据字符串值修改新列中的行值
- mysql - MYSQL 选择不适用于以前的版本
- r - 删除列的数字(零)并在数据框中创建一个新列
- python - ModuleNotFoundError:没有名为“fcntl”的模块
- javascript - 无法读取未定义的属性“onRowClickHandler”
- firefox - Firefox:关闭所有标签并*最后激活*
- reactjs - 访问css模块相关的问题
- javascript - JavaScript ES6 - 如何在类之间传递 API URLSearchParams
- vue.js - Vue 的 nextTick 之后如何拍摄 Storybook 的 Storyshots?