首页 > 解决方案 > 增加气流中特定 DAG 的超时

问题描述

我有一个 DAG,它需要很长时间才能执行 bigquery 操作。而且我总是收到错误'Broken DAG:[/home/airflow/gcs/dags/xyz.py] Timeout'我发现一些答案说我们必须增加airflow.cfg中的超时。但是这个想法不适合我的项目。是否有可能以某种方式增加特定 DAG 的超时时间?任何人请帮忙。谢谢你。

标签: airflowgoogle-cloud-composer

解决方案


是的,您可以dagrun_timeout在 Dag 上设置参数。

指定 DagRun 在超时/失败之前应该运行多长时间,以便可以创建新的 DagRun。超时仅对计划的 DagRuns 强制执行,并且仅在活动 DagRuns 的数量 == max_active_runs 时强制执行。

我们还可execution_timeout以为每个任务设置一个参数。

execution_timeout:允许执行此任务实例的最长时间,如果超过它将引发并失败。:type execution_timeout: datetime.timedelta

因此,如果其中一项任务是在 BigQuery 上运行查询,您可以使用类似

BigQueryOperator(sql=sql,
    destination_dataset_table={{ params.t_name }}),
    task_id='bq_query',
    bigquery_conn_id='my_bq_connection',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    create_disposition='CREATE_IF_NEEDED',
    query_params={'t_name': table_name},
    execution_timeout=datetime.timedelta(minutes=10)
    dag=dag)

推荐阅读