apache-spark - 剂量火花累加器导致 saveAsTextFile() 进入一个分区?
问题描述
我定义了一个函数:
def setJsonPushIndex(spark: SparkSession, currentdate: String, jsonPushInfo: RDD[(String, String)]): RDD[String] =
{
val sc = spark.sparkContext
val acc = new LongAccumulator()
sc.register(acc, "myaccumulator")
val jsonPushWithIndex = jsonPushInfo.map(x =>
{
acc.add(1)
val sendhour = x._2.toString
val index = pushUtil.toIndex(acc.value.toString)
var mid = "BI" + currentdate + sendhour + index
if (sendhour.toInt < pushUtil.getNextHour().toInt)
{
mid = "BI" + pushUtil.getNextday() + sendhour + index
}
mid + "\u0001" +
"\"mid\": " + "\"" + mid + "\"," +
x._1
}
)
jsonPushWithIndex
}
然后我调用主函数:
val json_push_res = setJsonPushIndex(spark, currentdate, json_pushInfo)
val jsonResultPath= "/jc/algorithm/NewUserPushTest/results/" + pushUtil.NowDate() + "/"
json_push_res.take(12).foreach(println)
json_push_res.saveAsTextFile(jsonResultPath)
原来我的目录“part-00000”中只有一个分区。我的命令是
spark-submit --master yarn --num-executors 5 --executor-cores 2 --executor-memory 5G --driver-memory 10G --class "apppush.NewUserPush_V2" /home/ilambda/lyj/test2.jar
结果数约为 30000。
结果数是小到最后只有一个分区还是Accumulator
导致这个结果?
解决方案
的使用Accumulators
不会影响DAG
,因此不会导致洗牌或合并。
因为jsonPushInfo
仅使用窄转换处理
jsonPushInfo.map(x =>
...
)
它也不会改变分区的数量。
因此我们可以得出结论,jsonPush
从一开始就只有一个分区。
推荐阅读
- java - RecyclerView 项目中的 ProgressBar 在 notifyDataSetChanged 上没有动画
- qt - 在析构函数中终止 QProcess
- jspdf - html2canvas:无法将 fullcalendar-scheduler 生成为 pdf
- ibm-cloud - 如何为 IBM Cloud 中的供应服务获取有意义的别名或名称?
- javascript - 测试 axios 出现网络错误
- jq - 使用 jq 从 json 中提取字段
- css - Bootstrap3 药丸,使用 CSS 悬停 li 项目
- javascript - 用于电子、react-native 和 NodeJS 应用程序的嵌入式数据库?
- python - 使用 Flask 错误处理程序一次捕获任何 Werkzeug 异常
- php - 如何删除 TYPO3 8 LTS 中内容/页面的“翻译为”?