首页 > 解决方案 > Scala fold for RDD [String] 行为怪异

问题描述

所以这是我的代码:

var exceptions: String = ""
  val count = failures.fold("0")((x1, x2) => {
    println(s"x1: $x1 and x2: $x2")
    if (x1 != "-433" && x2 != "-433") {
      (x1.toInt + x2.toInt).toString
    } else {
      println(s"before: $exceptions")
      exceptions = exceptions + ", " + "There is an exception in processing, check the logs of executors for actual information"
      println(s"after: $exceptions")
      if (x1 == "-433") {
        if (x2 != "-433") {
          x2
        } else "0"
      }else {
        if (x2 == "-433") {
           x1
        }else "0"
      }
    }
  })

计数是一个 RDD[字符串]。最奇怪的是 execptions 以“”结尾。以下是日志:

x1:0 和 x2:-433

前:

after: , 处理中出现异常,查看executors日志获取实际信息

x1:0 和 x2:0

最后 :

标签: scalaapache-sparkfold

解决方案


Spark RDD 是分布式数据结构,这意味着 RDD 的元素不在同一个节点上,RDD 上的转换不会发生在同一个节点上。

您所有的转换函数都包装在它们的闭包中,然后序列化,然后发送到执行程序节点,在执行程序节点反序列化,在执行程序节点执行。包装的闭包获得使用的上下文对象的“副本”。

即使您在本地节点上运行 spark,它也会在本地节点上运行多个执行程序,并且它们的行为方式相同。

因此,您的函数的每次执行都会获得自己的exceptions变量副本。因此,您的exception字符串将永远不会以您期望的方式更新。


推荐阅读