首页 > 解决方案 > 为什么迭代RDD后局部变量值不可见?

问题描述

嗨,我正在 scala 中为 apache-spark 编写代码。rdd 迭代完成后,我的局部变量“国家”值没有反映。在检查 rdd 迭代内的条件后,我在国家变量中赋值。直到 rdd 是迭代值在控制从循环值丢失后出来后在国家变量中可用。

import org.apache.spark.sql.SparkSession
import java.lang.Long

object KPI1 {

  def main(args:Array[String]){
    System.setProperty("hadoop.home.dir","C:\\shivam docs\\hadoop-2.6.5.tar\\hadoop-2.6.5");

    val spark=SparkSession.builder().appName("KPI1").master("local").getOrCreate();
    val textFile=spark.read.textFile("C:\\shivam docs\\HADOOP\\sample data\\wbi.txt").rdd;

    val splitData=textFile.map{
      line=>{
        val token=line.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
        (token(0),token(10).replace("\"","").replace(",", ""));
      }
    };

   // splitData.max()._2;
   var maxele=0l;
    var index=0;
    var country="";
    splitData.foreach(println);
    for(ele<-splitData){
      val data=Long.parseLong(ele._2);
      if(maxele<data){
        maxele=data;
        println(maxele);
        country=ele._1;
        println(country);
      }
    };
println("***************************** "+country+maxele);

spark.close()
  }
}

国家变量值不应具有默认值。

标签: scalaapache-spark

解决方案


两者皆为for广foreach术。这意味着执行将发生在多个执行程序上,这就是为什么您会获得某些线程的默认值。我在具有 4 个执行程序的单节点集群中运行我的示例代码,您可以看到执行发生在两个不同的执行程序中(线程 id 很明显)

样本

val baseRdd = spark.sparkContext.parallelize(Seq((1, 2), (3, 4)))

for (h <- baseRdd) {
  println( "Thread id " + Thread.currentThread().getId)
  println("Value "+ h)
} 

输出

Thread id 48
Value (1,2)
Thread id 50
Value (3,4)

如果您仍想获得预期的结果,请按照以下任一选项

1.将您的火花上下文配置更改为 master("local[1]"). 这将使用单个执行器运行您的工作。

  1. collect()splitData在表演之前for(ele<-splitData){...}

注意这两个选项都严格用于测试或实验目的,它不适用于大型数据集。


推荐阅读