首页 > 解决方案 > Scala - 在不执行的情况下获取具有阶段和任务的 DAG

问题描述

我正在寻找一种使用 RDD 获取 Scala Spark 应用程序的 DAG 的方法,包括阶段和任务。

我已经尝试过rdd.toDebugString,但它只显示 RDD 血统,而不是我正在寻找的 DAG。

我知道有显示 DAG 的 Web UI,但我想从代码中提取 DAG,就像explain函数对数据帧所做的那样。

标签: scalaapache-spark

解决方案


以下几点:

  • rdd.toDebugString仅适用于执行前的 RDD。

  • 执行 DAG是您可以在运行时通过Spark Web UI观察 RDD 和 Dataframes 的东西。查看新版本:https ://spark.apache.org/docs/3.0.0-preview/web-ui.html

  • 在执行之前,您可以运行一个.explainfor Dataframes。

  • 从一个好的来源:

Spark SQL EXPLAIN 运算符提供有关 sql 语句的详细计划信息,而无需实际运行它。您可以使用 Spark SQL EXPLAIN 运算符显示 Spark 执行引擎在执行任何查询时将生成和使用的实际执行计划。您可以使用此执行计划来优化您的查询。

数据框的一个简单示例:

import org.apache.spark.sql.Row     
val dfsFilename = "/FileStore/tables/sample_text.txt"
val readFileDF = spark.sparkContext.textFile(dfsFilename)  
val wordsDF = readFileDF.flatMap(_.split(" ")).toDF
val wcounts3 = wordsDF.filter(r => (r(0) != "Humpty") || (r(0) != "Dumpty"))
                      .groupBy("Value") // Note the value
                      .count().explain()

您为 Dataframe/Dataset 适当地标记语句,但不在 show() 上。

== Physical Plan ==
*(2) HashAggregate(keys=[Value#4], functions=[finalmerge_count(merge count#14L) AS count(1)#8L])
+- Exchange hashpartitioning(Value#4, 200), [id=#61]
   +- *(1) HashAggregate(keys=[Value#4], functions=[partial_count(1) AS count#14L])
      +- *(1) Filter <function1>.apply
         +- *(1) SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, java.lang.String, true], true, false) AS value#4]
            +- Scan[obj#3]

您的具体问题:不可能,也可能不完全有效,因为几乎没有需要考虑的优化。


推荐阅读