apache-spark - 为什么火花保存工作有 4 个阶段?
问题描述
我正在尝试将 Dataframe 保存到 HDFS 位置。但是我的保存需要很长时间。在此之前的操作是使用 Spark SQL 连接两个表。需要知道为什么保存有四个阶段以及如何提高性能。我已在此处附加了阶段列表Spark UI Image of the job。我还附上了我的代码片段。
火花代码:
该函数从主类获取数据,models 变量从 XML 获取表信息数据。最初,它获取源表的数据,然后尝试从其他连接表中检索数据。
def sourceGen(spark: SparkSession,
minBatchLdNbr: Int,
maxBatchLdNbr: Int,
batchLdNbrList: String,
models: (GModel, TModel, NModel)): Unit = {
val configJson = models._3
val gblJson = models._1
println("Source Loading started")
val sourceColumns = configJson.transformationJob.sourceDetails.sourceSchema
val query = new StringBuilder("select ")
sourceColumns.map { SrcColumn =>
if (SrcColumn.isKey == "nak") {
query.append(
"cast(" + SrcColumn.columnExpression + " as " + SrcColumn.columnDataType + ") as " + SrcColumn.columnName + ",")
}
}
var tableQuery: String =
if (!configJson.transformationJob.sourceDetails.sourceTableSchemaName.isEmpty) {
if (!batchLdNbrList.trim.isEmpty)
query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
else
query.dropRight(1) + " from " + configJson.transformationJob.sourceDetails.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
} else {
if (!batchLdNbrList.trim.isEmpty)
query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr + "or batch_ld_nbr in ( " + batchLdNbrList + " )"
else
query.dropRight(1) + " from " + gblJson.gParams.sourceTableSchemaName + "." + configJson.transformationJob.sourceDetails.sourceTableName + " where batch_ld_nbr > " + minBatchLdNbr + " and batch_ld_nbr <= " + maxBatchLdNbr
}
if (minBatchLdNbr == 0 && maxBatchLdNbr == 0) {
tableQuery = tableQuery.split("where")(0)
}
println("Time"+LocalDateTime.now());
val tableQueryDf: DataFrame = spark.sql(tableQuery)
println("tableQueryDf"+tableQueryDf);
println("Time"+LocalDateTime.now());
println("Source Loading ended")
println("Parent Loading Started")
val parentColumns = configJson.transformationJob.sourceDetails.parentTables
val parentSourceJoinDF: DataFrame = if (!parentColumns.isEmpty) {
parentChildJoin(tableQueryDf,
parentColumns,
spark,
gblJson.gParams.pSchemaName)
} else {
tableQueryDf
}
println("tableQueryDf"+tableQueryDf);
println("Parent Loading ended")
println("Key Column Generation Started")
println("Time"+LocalDateTime.now());
val arrOfCustomExprs = sourceColumns
.filter(_.isKey.toString != "nak")
.map(
f =>
functions
.expr(f.columnExpression)
.as(f.columnName)
.cast(f.columnDataType))
val colWithExpr = parentSourceJoinDF.columns.map(f =>
parentSourceJoinDF.col(f)) ++ arrOfCustomExprs
val finalQueryDF = parentSourceJoinDF.select(colWithExpr: _*)
println("finalQueryDF"+finalQueryDF);
println("Time"+LocalDateTime.now());
keyGenUtils.writeParquetTemp(
finalQueryDF,
configJson.transformationJob.globalParams.hdfsInterimPath + configJson.transformationJob.sourceDetails.sourceTableName + "/temp_" + configJson.transformationJob.sourceDetails.sourceTableName
)
println("PrintedTime"+LocalDateTime.now());
println("Key Column Generation Ended")
}
下面的代码用于从连接表中检索数据。
private def parentChildJoin(tableQueryDf: DataFrame,
ptJoin: Array[ParentTables],
sparkSession: SparkSession,
gParentSchema: String): DataFrame = {
if (ptJoin.isEmpty) {
tableQueryDf
} else {
val parentJoin = ptJoin.head
val columns = new StringBuilder("select ")
for (ptCols <- parentJoin.columns) {
columns.append(
ptCols.columnExpression + " as " + ptCols.columnName + ",")
}
val statement = columns.dropRight(1)
if (!parentJoin.pSchemaName.isEmpty) {
statement.append(
" from " + parentJoin.pSchemaName + "." + parentJoin.pTableName)
} else {
statement.append(" from " + gParentSchema + "." + parentJoin.pTableName)
}
println("Time"+LocalDateTime.now());
println("parentJoin.pTableName"+parentJoin.pTableName);
val pQueryDF =
if (parentJoin.pTableName.equalsIgnoreCase("order_summary_si_fact_t")) {
val ordCalDt = "ord_cal_dt"
val distinctDates = tableQueryDf
.selectExpr(ordCalDt)
.distinct
.collect
.map(_.getAs[String](0))
sparkSession.sql(statement.toString).where(col(ordCalDt).isin(distinctDates: _*)).distinct
} else {
sparkSession.sql(statement.toString).distinct
}
println("Time"+LocalDateTime.now());
//val pQueryDF = sparkSession.sql(statement.toString).distinct
println("statement-"+parentJoin.pTableName+"-"+statement);
parentChildJoin(
tableQueryDf.join(pQueryDF,
parentJoin.pJoinCondition.map(_.sourceKey).toSeq,
parentJoin.joinType),
ptJoin.tail,
sparkSession,
gParentSchema)
}
}
这是写入 HDFS 的函数。
def writeParquetTemp(df: DataFrame, hdfsPath: String): Unit = {
df.write.format("parquet").option("compression", "none").mode(SaveMode.Overwrite).save(hdfsPath)
}
Spark提交配置:
/usr/hdp/2.6.3.0-235/spark2/bin//spark-submit --master yarn --deploy-mode client --driver-memory 30G --executor-memory 25G --executor-cores 6 --conf spark.dynamicAllocation.enabled=true --conf spark.shuffle.service.enabled=true --conf spark.sql.autoBroadcastJoinThreshold=774857600 --conf spark.kryoserializer.buffer.max.mb=512 --conf spark.dynamicAllocation.maxExecutors=40 --conf spark.eventLog.enabled=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.sql.parquet.binaryAsString=true --conf spark.sql.broadcastTimeout=36000 --conf spark.sql.shuffle.partitions=500
解决方案
从attached Image
看起来像 => 阅读后您正在repartition
对Dataset1(1277 个分区)进行操作=>
第 10 阶段 -> 读取并设置“压缩”数据集的阶段边界,该数据集具有 1277 个文件总数 * 每个文件的块大小(近似于最大可用内核)=> 随机写入
第 11 阶段 -> 随机播放此数据集,默认 spark.sql.shuffle.partitions 设置为 500
第 12 阶段 -> 为第二个数据集读取并创建阶段边界,该数据集具有 348 个文件总数 * 每个文件的块大小(近似于最大可用内核)=> 随机写入
第 13 阶段 -> 加入这两个数据集并保存在 HDFS 中,默认为 spark.sql.shuffle.partitions,设置为 500
使用 Code,我们可以看到哪里出了问题,但是最后一次加入的地方很大Shuffle R%ead
,所以可能你想降低你的默认随机分区。
推荐阅读
- c# - 将 jQuery 字典对象传递给输入字段
- angular - 在角度中使用ngrx时无法读取未定义的属性'showContent'
- sql - 使用现有数据在 Oracle 中将日期列转换为 varchar 列
- c - 将文件存储在块中(以二进制格式)并使用 c 检索它
- c++ - 为 amazon-kinesis-video-streams-producer-sdk-cpp 构建依赖项时出错
- python - 循环通过 GroupBy DataFrame 的有效方法
- javascript - 尝试通过 id 更改 div 样式时,为什么我无法读取 null 的属性“样式”?
- javascript - 使用正则表达式在段落中用空格或换行替换多个下划线
- django - 芹菜任务无法访问docker中的卷文件
- java - 当我使用类型键盘保护对话框时,我的屏幕停止工作