apache-spark - Spark 阶段花费的时间太长 - 2 个执行者完成“所有”工作
问题描述
过去一天我一直在尝试解决这个问题,但没有成功。
我面临的问题
我正在阅读一个大约 2GB 大的镶木地板文件。初始读取为 14 个分区,然后最终被拆分为 200 个分区。我执行看似简单的 SQL 查询,运行时间超过 25 分钟,大约 22 分钟用于单个阶段。在 Spark UI 中,我看到所有计算最终都被推送到大约 2 到 4 个执行程序,并进行了大量的改组。我不知道发生了什么事。如有任何帮助,我将不胜感激。
设置
- Spark 环境- Databricks
- 集群模式- 标准
- Databricks 运行时版本- 6.4 ML(包括 Apache Spark 2.4.5、Scala 2.11)
- 云- 天蓝色
- 工作器类型- 56 GB,每台机器 16 个内核。最少 2 台机器
- 驱动程序类型- 112 GB,16 核
笔记本
单元格 1:辅助函数
load_data = function(path, type) {
input_df = read.df(path, type)
input_df = withColumn(input_df, "dummy_col", 1L)
createOrReplaceTempView(input_df, "__current_exp_data")
## Helper function to run query, then save as table
transformation_helper = function(sql_query, destination_table) {
createOrReplaceTempView(sql(sql_query), destination_table)
}
## Transformation 0: Calculate max date, used for calculations later on
transformation_helper(
"SELECT 1L AS dummy_col, MAX(Date) max_date FROM __current_exp_data",
destination_table = "__max_date"
)
## Transformation 1: Make initial column calculations
transformation_helper(
"
SELECT
cId AS cId
, date_format(Date, 'yyyy-MM-dd') AS Date
, date_format(DateEntered, 'yyyy-MM-dd') AS DateEntered
, eId
, (CASE WHEN isnan(tSec) OR isnull(tSec) THEN 0 ELSE tSec END) AS tSec
, (CASE WHEN isnan(eSec) OR isnull(eSec) THEN 0 ELSE eSec END) AS eSec
, approx_count_distinct(eId) OVER (PARTITION BY cId) AS dc_eId
, COUNT(*) OVER (PARTITION BY cId, Date) AS num_rec
, datediff(Date, DateEntered) AS analysis_day
, datediff(max_date, DateEntered) AS total_avail_days
FROM __current_exp_data
CROSS JOIN __max_date ON __main_data.dummy_col = __max_date.dummy_col
",
destination_table = "current_exp_data_raw"
)
## Transformation 2: Drop row if Date is not valid
transformation_helper(
"
SELECT
cId
, Date
, DateEntered
, eId
, tSec
, eSec
, analysis_day
, total_avail_days
, CASE WHEN analysis_day == 0 THEN 0 ELSE floor((analysis_day - 1) / 7) END AS week
, CASE WHEN total_avail_days < 7 THEN NULL ELSE floor(total_avail_days / 7) - 1 END AS avail_week
FROM current_exp_data_raw
WHERE
isnotnull(Date) AND
NOT isnan(Date) AND
Date >= DateEntered AND
dc_eId == 1 AND
num_rec == 1
",
destination_table = "main_data"
)
cacheTable("main_data_raw")
cacheTable("main_data")
}
spark_sql_as_data_table = function(query) {
data.table(collect(sql(query)))
}
get_distinct_weeks = function() {
spark_sql_as_data_table("SELECT week FROM current_exp_data GROUP BY week")
}
单元格 2:调用触发长时间运行任务的辅助函数
library(data.table)
library(SparkR)
spark = sparkR.session(sparkConfig = list())
load_data_pq("/mnt/public-dir/file_0000000.parquet")
set.seed(1234)
get_distinct_weeks()
长期运行阶段 DAG
长期运行阶段统计
日志
我把它剪掉了,只显示下面多次出现的条目
BlockManager: Found block rdd_22_113 locally
CoarseGrainedExecutorBackend: Got assigned task 812
ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
InMemoryTableScanExec: Predicate (dc_eId#61L = 1) generates partition filter: ((dc_eId.lowerBound#622L <= 1) && (1 <= dc_eId.upperBound#621L))
InMemoryTableScanExec: Predicate (num_rec#62L = 1) generates partition filter: ((num_rec.lowerBound#627L <= 1) && (1 <= num_rec.upperBound#626L))
InMemoryTableScanExec: Predicate isnotnull(Date#57) generates partition filter: ((Date.count#599 - Date.nullCount#598) > 0)
InMemoryTableScanExec: Predicate isnotnull(DateEntered#58) generates partition filter: ((DateEntered.count#604 - DateEntered.nullCount#603) > 0)
MemoryStore: Block rdd_17_104 stored as values in memory (estimated size <VERY SMALL NUMBER < 10> MB, free 10.0 GB)
ShuffleBlockFetcherIterator: Getting 200 non-empty blocks including 176 local blocks and 24 remote blocks
ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms
UnsafeExternalSorter: Thread 254 spilling sort data of <Between 1 and 3 GB> to disk (3 times so far)
解决方案
推荐阅读
- angularjs - 单击时更改按钮文本的角度
- sql - Azure SQL Server SUM 查询随机超时
- fish - 有条件地在鱼中设置局部变量
- vue.js - 如何在 Vue.js 2.5+ 中使用 Vue-IMask 和 Vuetify
- javascript - 将工厂与 ng 模块绑定是一种好习惯吗?
- c - malloc int 数组(分段错误)
- c# - 将 DateTime 与 Microsoft SQL 和 Visual C# 进行比较
- c++ - 为什么概念细化不能使用简洁的语法
- python-3.x - pandas 中的 lambda 函数条件无法识别 np.nan
- image - 压缩和解压缩图像