首页 > 解决方案 > Spark工会很重

问题描述

val billRDD = sc.textFile("file/cdr/*/2018/07/27/20/*").map(line => line.split("\\|"))
val areaCodeAll: RDD[((String, String, String), List[List[String]])] = billRDD.repartition(24).map(x=>((x(76),x(77),x(78)),List(x(3),x(53),x(54),x(20),x(44),x(61),x(62),x(22),x(63),x(24),x(64),x(65),x(47),x(48),x(60),x(68),x(67))))
  .combineByKey(
    (v: List[String]) => List(v),
    (c:List[List[String]],v:List[String]) => v +: c,
    (c1:List[List[String]],c2:List[List[String]]) => List.concat(c1,c2)
  )
areaCodeAll.cache()
val provinceAll = areaCodeAll.map(x => (x._1._1,x._2)).filter(_._1!="-1").reduceByKey(_:::_)
val cityAll = areaCodeAll.map(x => (x._1._2,x._2)).filter(_._1!="-1").reduceByKey(_:::_)
val countyAll = areaCodeAll.map(x => (x._1._3,x._2)).filter(_._1!="-1")
val dataByAreaCode = provinceAll.union(cityAll).union(countyAll)

我的源文件号是48,大小是871.5M。这是文件信息:

在此处输入图像描述

这是我的火花壳设置:

spark-shell --master yarn-client spark.default.parallelism=600 --conf spark.rdd.compress=true  --queue=queue_qoezy --num-executors 48 --executor-cores 4 --executor-memory 16g --driver-memory 8g --conf spark.serializer=org.apache.spark.serializer.KryoSerializer

实际上,这个程序的运行时间很长,最多 5 分钟。这是 Spark Web UI 的 DAG 可视化: 在此处输入图像描述

程序总是卡在最后一个任务: 在此处输入图像描述

如何通过scala优化我的spark程序?帮助,谢谢!

标签: scalaapache-sparkrdd

解决方案


据我所见,我很确定花费最多时间的任务实际上是 reduceByKey 而不是联合,这是有道理的,因为它是一个 shuffle(如果你不知道 shuffle 是如何工作的,请在评论中问我)。您可以看到在第 8 阶段设置了 reduce 操作。据我所知,至少与 reduce 相比,union 应该非常快。

PS如果你有足够的执行者你应该看看treeReduce。


推荐阅读