首页 > 解决方案 > Jackson 创建的 Java Iterator 问题破坏了 Scala Flink 应用程序

问题描述

有一个 Scala Flink 应用程序,我在其中使用 Jackson 库解析 JSON。解析由自定义方法处理,它使用惰性启动概念来保持快速。

现在,无论出于何种原因,在 Flink 管道中进一步传递具有惰性值的模型会导致一些奇怪的错误,因为util.Iterator这是读取 JSON 的主干。我怀疑这个问题实际上可能来自,Kryo但我不知道如何确认。值得注意的是,.toList在同一个(flink)中急切地初始化模型(用 )map解决了这个问题。但事实并非如此,我想进一步传递我的懒惰模型。

最后,我提供了一个带有演示代码的存储库,但我还想在 StackOverflow 中提供所有详细信息。

示例模型和解析定义:

case class Root(items: Collection[Data])
case class Data(data: Collection[Double])

def toRoot(node: JsonNode): Root = {
    val data: util.Iterator[JsonNode] = if (node.hasNonNull("items")) node.get("items").elements() else node.elements()
    val items: Collection[Data] = data.asScala.map(x => toData(x))
    Root(items)
}

JSON数据类似于:

{
  "items": [
    {
      "data": [
        11.71476355252127,
        48.342882259940176,
        507.3,
        11.714791605037252,
        ...

并在一项工作中完成所有map工作:

env.fromCollection(Seq(input))
   .map(i => flatten(read(i)))
   .print()

但进一步传递失败:

env.fromCollection(Seq(input))
   .map(i => read(i))
   .map(i => flatten(i))
   .print()

有错误:

Caused by: java.util.ConcurrentModificationException
    at java.util.ArrayList$Itr.checkForComodification(ArrayList.java:911)
    at java.util.ArrayList$Itr.next(ArrayList.java:861)
    at scala.collection.convert.Wrappers$JIteratorWrapper.next(Wrappers.scala:43)
Caused by: java.lang.NullPointerException
    at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:80)
    at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:488)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:57)
    ... 29 more

我创建了一个演示项目,所有这些示例都准备好使用 Scala 2.11 和 2.12 进行测试,因为它实际上提供了不同的结果可用这里

标签: scalajacksoniteratorapache-flinkkryo

解决方案


推荐阅读