首页 > 解决方案 > Spark 阶段花费的时间太长 - 2 个执行者完成“所有”工作

问题描述

过去一天我一直在尝试解决这个问题,但没有成功。

我面临的问题

我正在阅读一个大约 2GB 大的镶木地板文件。初始读取为 14 个分区,然后最终被拆分为 200 个分区。我执行看似简单的 SQL 查询,运行时间超过 25 分钟,大约 22 分钟用于单个阶段。在 Spark UI 中,我看到所有计算最终都被推送到大约 2 到 4 个执行程序,并进行了大量的改组。我不知道发生了什么事。如有任何帮助,我将不胜感激。

设置

笔记本

单元格 1:辅助函数


load_data = function(path, type) {
  input_df = read.df(path, type)

  input_df = withColumn(input_df, "dummy_col", 1L)
  createOrReplaceTempView(input_df, "__current_exp_data")

  ## Helper function to run query, then save as table
  transformation_helper = function(sql_query, destination_table) {
    createOrReplaceTempView(sql(sql_query), destination_table)
  }

  ## Transformation 0: Calculate max date, used for calculations later on
  transformation_helper(
    "SELECT 1L AS dummy_col, MAX(Date) max_date FROM __current_exp_data",
    destination_table = "__max_date"
  )

  ## Transformation 1: Make initial column calculations
  transformation_helper(
    "
    SELECT
        cId                                                          AS cId
      , date_format(Date, 'yyyy-MM-dd')                              AS Date
      , date_format(DateEntered, 'yyyy-MM-dd')                       AS DateEntered
      , eId

      , (CASE WHEN isnan(tSec) OR isnull(tSec) THEN 0 ELSE tSec END) AS tSec
      , (CASE WHEN isnan(eSec) OR isnull(eSec) THEN 0 ELSE eSec END) AS eSec

      , approx_count_distinct(eId) OVER (PARTITION BY cId)           AS dc_eId
      , COUNT(*)                   OVER (PARTITION BY cId, Date)     AS num_rec

      , datediff(Date, DateEntered)                                  AS analysis_day
      , datediff(max_date, DateEntered)                              AS total_avail_days
    FROM __current_exp_data
    CROSS JOIN __max_date ON __main_data.dummy_col = __max_date.dummy_col
  ",
    destination_table = "current_exp_data_raw"
  )

  ## Transformation 2: Drop row if Date is not valid
  transformation_helper(
    "
    SELECT 
        cId
      , Date
      , DateEntered
      , eId
      , tSec
      , eSec

      , analysis_day
      , total_avail_days
      , CASE WHEN analysis_day     == 0 THEN 0    ELSE floor((analysis_day - 1) / 7)   END AS week
      , CASE WHEN total_avail_days  < 7 THEN NULL ELSE floor(total_avail_days / 7) - 1 END AS avail_week
    FROM current_exp_data_raw
    WHERE 
        isnotnull(Date) AND 
        NOT isnan(Date) AND 
        Date >= DateEntered AND
        dc_eId == 1 AND
        num_rec == 1
  ",
    destination_table = "main_data"
  )
  
  cacheTable("main_data_raw")
  cacheTable("main_data")
}

spark_sql_as_data_table = function(query) {
  data.table(collect(sql(query)))
}

get_distinct_weeks = function() {
  spark_sql_as_data_table("SELECT week FROM current_exp_data GROUP BY week")
}

单元格 2:调用触发长时间运行任务的辅助函数

library(data.table)
library(SparkR)

spark = sparkR.session(sparkConfig = list())

load_data_pq("/mnt/public-dir/file_0000000.parquet")

set.seed(1234)

get_distinct_weeks()

长期运行阶段 DAG

长期运行阶段 DAG

长期运行阶段统计

执行者统计

日志

我把它剪掉了,只显示下面多次出现的条目


BlockManager: Found block rdd_22_113 locally

CoarseGrainedExecutorBackend: Got assigned task 812

ExternalAppendOnlyUnsafeRowArray: Reached spill threshold of 4096 rows, switching to org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter

InMemoryTableScanExec: Predicate (dc_eId#61L = 1) generates partition filter: ((dc_eId.lowerBound#622L <= 1) && (1 <= dc_eId.upperBound#621L))

InMemoryTableScanExec: Predicate (num_rec#62L = 1) generates partition filter: ((num_rec.lowerBound#627L <= 1) && (1 <= num_rec.upperBound#626L))

InMemoryTableScanExec: Predicate isnotnull(Date#57) generates partition filter: ((Date.count#599 - Date.nullCount#598) > 0)

InMemoryTableScanExec: Predicate isnotnull(DateEntered#58) generates partition filter: ((DateEntered.count#604 - DateEntered.nullCount#603) > 0)

MemoryStore: Block rdd_17_104 stored as values in memory (estimated size <VERY SMALL NUMBER &lt; 10> MB, free 10.0 GB)

ShuffleBlockFetcherIterator: Getting 200 non-empty blocks including 176 local blocks and 24 remote blocks

ShuffleBlockFetcherIterator: Started 4 remote fetches in 1 ms

UnsafeExternalSorter: Thread 254 spilling sort data of <Between 1 and 3 GB> to disk (3  times so far)

标签: apache-sparkapache-spark-sqldatabricksazure-databrickssparkr

解决方案


推荐阅读