scala - 为什么迭代RDD后局部变量值不可见?
问题描述
嗨,我正在 scala 中为 apache-spark 编写代码。rdd 迭代完成后,我的局部变量“国家”值没有反映。在检查 rdd 迭代内的条件后,我在国家变量中赋值。直到 rdd 是迭代值在控制从循环值丢失后出来后在国家变量中可用。
import org.apache.spark.sql.SparkSession
import java.lang.Long
object KPI1 {
def main(args:Array[String]){
System.setProperty("hadoop.home.dir","C:\\shivam docs\\hadoop-2.6.5.tar\\hadoop-2.6.5");
val spark=SparkSession.builder().appName("KPI1").master("local").getOrCreate();
val textFile=spark.read.textFile("C:\\shivam docs\\HADOOP\\sample data\\wbi.txt").rdd;
val splitData=textFile.map{
line=>{
val token=line.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)");
(token(0),token(10).replace("\"","").replace(",", ""));
}
};
// splitData.max()._2;
var maxele=0l;
var index=0;
var country="";
splitData.foreach(println);
for(ele<-splitData){
val data=Long.parseLong(ele._2);
if(maxele<data){
maxele=data;
println(maxele);
country=ele._1;
println(country);
}
};
println("***************************** "+country+maxele);
spark.close()
}
}
国家变量值不应具有默认值。
解决方案
两者皆为for
广foreach
术。这意味着执行将发生在多个执行程序上,这就是为什么您会获得某些线程的默认值。我在具有 4 个执行程序的单节点集群中运行我的示例代码,您可以看到执行发生在两个不同的执行程序中(线程 id 很明显)
样本
val baseRdd = spark.sparkContext.parallelize(Seq((1, 2), (3, 4)))
for (h <- baseRdd) {
println( "Thread id " + Thread.currentThread().getId)
println("Value "+ h)
}
输出
Thread id 48
Value (1,2)
Thread id 50
Value (3,4)
如果您仍想获得预期的结果,请按照以下任一选项
1.将您的火花上下文配置更改为
master("local[1]")
. 这将使用单个执行器运行您的工作。
collect()
你splitData
在表演之前for(ele<-splitData){...}
注意这两个选项都严格用于测试或实验目的,它不适用于大型数据集。
推荐阅读
- python - Dask/Pandas 是否支持基于依赖于其他行的复杂条件删除组中的行?
- ios - 如何获取搜索控制器文本字段的大小(用于调整占位符的大小)?
- c# - 如何在 C# v4 中从 LUIS 无意图调用 QnA 制造商
- javascript - 如何迭代一个迭代器?
- servlets - 在 WebApp 中加密密码
- php - 如何在 Laravel 中的用户会话到期时更新数据库中的字段
- python - Python Ctypes - 加载 dll 引发 OSError:[WinError 193] %1 不是有效的 Win32 应用程序
- node.js - nodemon 应用程序崩溃 - 等待文件更改
- sql-server - 如何有条件地加入 T-SQL 存储过程
- java - 如何在java中使用服务名连接到oracle