首页 > 解决方案 > 气流外部任务传感器卡​​住

问题描述

我正在尝试让 Airflow ExternalTask​​Sensor 工作,但到目前为止还未能完成,它似乎总是卡在运行中并且永远不会完成,因此 DAG 可以继续执行下一个任务。

这是我用来测试的代码:


DEFAULT_ARGS = {
    'owner': 'NAME',
    'depends_on_past': False,
    'start_date': datetime(2019, 9, 9),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False
}

external_watch_dag = DAG(
    'DAG-External_watcher-Test',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

start_op = DummyOperator(
    task_id='start_op',
    dag=external_watch_dag
)


trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag
)

external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_delta=timedelta(minutes=-1),
    # execution_date_fn=datetime(2019, 9, 25),
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

end_op = DummyOperator(
    task_id='end_op',
    dag=external_watch_dag
)

start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op


# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
    'DAG-Dummy',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=1),
    schedule_interval=None
)

dummy_task = BashOperator(
    task_id='dummy_task',
    bash_command='sleep 10',
    dag=dummy_dag
)

我已经尝试通过多种方式调整此代码,但未通过 ExternalTask​​Sensor 获得任何成功。

有谁知道如何解决这个问题并使 ExternalTask​​Sensor 正常工作?我还读到,使用 ExternalTask​​Sensor 时,调度间隔可能会出现问题,问题的一部分是否可能是 DAG 都具有schedule_interval=None

我已经让它与两个设置为完全相同的 DAG 一起工作schedule_interval,但这在生产中不起作用。目标是让主 DAG、external-watch-dag定期运行,并在运行期间触发该DAG-Dummy ,而DAG-Dummy本身具有schedule_interval=None.

任何帮助是极大的赞赏。

标签: pythonairflow-schedulerairflow

解决方案


默认情况下,ExternalTaskSensor将监视external_dag_id与传感器 DAG 相同的执行日期。您可以设置传感器 dag 和外部 dag 之间的时间增量,execution_delta以便它可以寻找正确execution_date的监控。当两个 dag 都按计划运行时,这很有效,因为您确切地知道这个 timedelta。

问题:当手动或由另一个 dag 触发 dag 时,您无法确定这两个 dag 中任何一个的确切执行日期。

解决方法:因为你使用的是TriggerDagRunOperator,所以可以设置execution_date参数。这将确保您的 dag 和外部 dag 的执行日期相同。从文档

execution_date (str or datetime.datetime) – dag 的执行日期(模板化)

因此,您的代码将如下所示:

trigger_external = TriggerDagRunOperator(
    task_id='trigger_external',
    trigger_dag_id='DAG-Dummy',
    dag=external_watch_dag,
    execution_date="{{ execution_date }}",  # Use the template to get the current execution date
)
external_watch_op = ExternalTaskSensor(
    task_id='external_watch_op',
    external_dag_id='DAG-Dummy',
    external_task_id='dummy_task',
    check_existence=True,
    execution_timeout=timedelta(minutes=30),
    dag=external_watch_dag
)

推荐阅读