scala - Spark工会很重
问题描述
val billRDD = sc.textFile("file/cdr/*/2018/07/27/20/*").map(line => line.split("\\|"))
val areaCodeAll: RDD[((String, String, String), List[List[String]])] = billRDD.repartition(24).map(x=>((x(76),x(77),x(78)),List(x(3),x(53),x(54),x(20),x(44),x(61),x(62),x(22),x(63),x(24),x(64),x(65),x(47),x(48),x(60),x(68),x(67))))
.combineByKey(
(v: List[String]) => List(v),
(c:List[List[String]],v:List[String]) => v +: c,
(c1:List[List[String]],c2:List[List[String]]) => List.concat(c1,c2)
)
areaCodeAll.cache()
val provinceAll = areaCodeAll.map(x => (x._1._1,x._2)).filter(_._1!="-1").reduceByKey(_:::_)
val cityAll = areaCodeAll.map(x => (x._1._2,x._2)).filter(_._1!="-1").reduceByKey(_:::_)
val countyAll = areaCodeAll.map(x => (x._1._3,x._2)).filter(_._1!="-1")
val dataByAreaCode = provinceAll.union(cityAll).union(countyAll)
我的源文件号是48,大小是871.5M。这是文件信息:
这是我的火花壳设置:
spark-shell --master yarn-client spark.default.parallelism=600 --conf spark.rdd.compress=true --queue=queue_qoezy --num-executors 48 --executor-cores 4 --executor-memory 16g --driver-memory 8g --conf spark.serializer=org.apache.spark.serializer.KryoSerializer
实际上,这个程序的运行时间很长,最多 5 分钟。这是 Spark Web UI 的 DAG 可视化:
如何通过scala优化我的spark程序?帮助,谢谢!
解决方案
据我所见,我很确定花费最多时间的任务实际上是 reduceByKey 而不是联合,这是有道理的,因为它是一个 shuffle(如果你不知道 shuffle 是如何工作的,请在评论中问我)。您可以看到在第 8 阶段设置了 reduce 操作。据我所知,至少与 reduce 相比,union 应该非常快。
PS如果你有足够的执行者你应该看看treeReduce。
推荐阅读
- wpf - 使用带有 IsNavigationTarget 的 Prism 处理嵌套视图,可以返回 false
- regex - 匹配只有两个结束字符的域的正则表达式?
- spring - BeanDefinitionStoreException: 无法解析配置类...因为它不存在
- c++ - 使用列表初始化器与构造器的复杂<>?
- azure - 相同操作的 Powershell cmdlet 差异
- java - Java 递归 - 基于子属性捕获父类别
- python - 使用数据框绘制子图
- linux - 如果命令超过 1 行,如何在 shell 或 bash 中输入新行/移动到下一行而不执行命令?
- linux - 如何解冻用户的内存限制?
- reactjs - 美人鱼类图的自定义箭头