首页 > 解决方案 > Apache Airflow 在* Spark 作业中编排 DAG?

问题描述

只是想知道是否有人对使用 Airflow 在 Spark 作业中编排转换步骤的方法有任何想法。

大多数包含 Spark 作业的 Airflow 演示似乎都遵循定义 DAG 的相同过程,如下所示:

> A: trigger to pick up a file from an S3 
> B: move the file to a different S3 location
> C: run a spark job to create an aggregate extract
> D: send the extract out, e.g. via email

我想了解的是也使用 Airflow 来协调 Spark 作业本身的内部工作的可能性。为简单起见,ETL 火花作业可能看起来像

> C1: spark.read.csv(xxxx) 
> C2: transform dataframe with function transform_1() 
> C3: transform dataframe with function transform_2()
> C4: transform dataframe with function transform_3()
> C5: write dataframe to S3

所有这些 C 步骤都需要在同一个 Spark 作业中运行,因为它们在同一个 Spark DAG 上运行。如果作业中的每个任务都被视为 Airflow DAG 的一部分,则流程将是 A、B、C1、C2、C3、C4、C5、D。

根据正在处理的数据,可能需要选择不同的转换函数。如果我们可以将转换函数定义为 Airflow 任务,那就太好了,因此对于不同的数据集,DAG 可能是 A、B、C1、C2、C4、C5、C7、D。

有没有办法定义一个 DAG 并有效地将一个子 DAG 委托给 spark 并让 spark 驱动程序知道并能够与 Airflow 进行通信以确定接下来要运行哪个函数并将更新反馈给 Airflow 以便 DAG 是更新进度?

注意:我知道,在许多情况下,在 pyspark 中调用转换函数会很快,因为它只是构建一个 spark DAG,并且实际上不需要任何时间,直到 spark DAG 被执行 - 这个问题的目的是使用 Airflow 定义 spark DAG 的组成,并能够跟踪其进度。我也知道,如果其中一个 C 任务在 spark 中失败,在大多数情况下,所有前面的 C 任务(即整个子 DAG)都需要重新运行。

非常感谢您的阅读,我期待听到您的想法。

戴夫

标签: apache-sparkpysparkairflow

解决方案


推荐阅读