首页 > 解决方案 > Airflow - 从 BigQuery 动态生成任务,但任务在之前完成之前重复运行

问题描述

语境

我正在尝试使用 Composer、DataProc 和 BigQuery 在 Google Cloud Platform 上构建一个摄取管道。我在 BigQuery 中有一个表,其中包含数据源及其相关文件的记录。因此,如果我需要提取 5 个文件,那么 BigQuery 表中有 5 条记录。明天可能是不同数量的文件。因此,我考虑在我的 DAG 中动态构建任务。

高层设计如下:

此设置运行良好。我可以在 Airflow UI 中看到我的 DAG 和所有动态生成的任务。

编辑:只需添加更多细节。BigQuery 表将包含少于 25 条记录,因此无需担心查询表。每 30 秒查询一次表。其次,我只需要这个 DAG 每 4 小时左右运行一次。我不打算让我的作曲家在那段时间继续运行。我需要每 4 小时启动 Composer,运行一次DAG以处理所有可用文件,然后关闭。

问题

在执行这些 DataProc 任务时,大约几分钟后,Airflow 会刷新 DAG 并再次运行同一组任务。在 DataProc Jobs 控制台中,我看到同一任务的 2 个(有时是 3 个)实例处于运行状态。这是不可取的。

我试过的

我已经retries=0在任务级别和 DAG 上设置了catchup=Falsemax_active_runs=1schedule_interval='@once'。DAG 的默认参数也有retries=0.

我认为问题在于我从 BigQuery 中提取记录的部分是普通功能的一部分,而不是本身的任务。我没有把它放在一个任务中的原因是因为我找不到一个解决方案来将从 BigQuery 获取的结果传递到我必须循环它们的后续任务中。

我尝试调用 PythonOperator 并在其中执行Variable.set("df", df),希望可以循环,Variable.get("df")但这也没有成功。

下面分享相关代码。

def fetch_pending_files_from_bq():
    # fetch records from BigQuery and return as dataframe

default_args = {
    'start_date': yesterday,
    'default_timezone': 'utc',
    'retries': 0
}

dag = DAG(
    dagid,
    default_args=default_args,
    catchup=False,
    max_active_runs=1,
    description='DAG to ingest data',
    schedule_interval='@once'
)

start_dag = DummyOperator(task_id="start_dag", dag=dag)
end_dag = DummyOperator(task_id="end_dag", dag=dag)

pending_files_df = fetch_pending_files_from_bq()

for index, row in pending_files_df.iterrows():
    task = DataProcSparkOperator(
        dag=dag,
        task_id=row["file_name"],
        arguments=dataproc_args,
        region="us-east1",
        job_name="job_{}".format(task_id),
        dataproc_spark_jars=dataproc_jars,
        ....
        ....
    )

    task.set_upstream(start_dag)
    task.set_downstream(end_dag)

我得到了我想要的编排,唯一的问题是我的 DataProc 作业会自动重新运行。

任何想法表示赞赏。

标签: pythongoogle-bigqueryairflowgoogle-cloud-composer

解决方案


在深入研究设计时,我意识到 fetch_pending_files_from_bq 不是一项任务,因此每次刷新 dag 时都会执行它。这导致了多个查询,并且还导致意外创建重复任务。因此我放弃了这个设计。

我能够使用 subdags 解决这个问题。第一个 subdag 从 BQ 读取并写入 GCS。第二个 subdag 从 GCS 读取文件并动态创建任务。


推荐阅读