python - 气流外部任务传感器卡住
问题描述
我正在尝试让 Airflow ExternalTaskSensor 工作,但到目前为止还未能完成,它似乎总是卡在运行中并且永远不会完成,因此 DAG 可以继续执行下一个任务。
这是我用来测试的代码:
DEFAULT_ARGS = {
'owner': 'NAME',
'depends_on_past': False,
'start_date': datetime(2019, 9, 9),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False
}
external_watch_dag = DAG(
'DAG-External_watcher-Test',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
schedule_interval=None
)
start_op = DummyOperator(
task_id='start_op',
dag=external_watch_dag
)
trigger_external = TriggerDagRunOperator(
task_id='trigger_external',
trigger_dag_id='DAG-Dummy',
dag=external_watch_dag
)
external_watch_op = ExternalTaskSensor(
task_id='external_watch_op',
external_dag_id='DAG-Dummy',
external_task_id='dummy_task',
check_existence=True,
execution_delta=timedelta(minutes=-1),
# execution_date_fn=datetime(2019, 9, 25),
execution_timeout=timedelta(minutes=30),
dag=external_watch_dag
)
end_op = DummyOperator(
task_id='end_op',
dag=external_watch_dag
)
start_op >> trigger_external >> external_watch_op >> end_op
# start_op >> [external_watch_op, trigger_external]
# external_watch_op >> end_op
# Below is the setup for the dummy DAG that is called above by the Trigger and watched by the TaskSensor
dummy_dag = DAG(
'DAG-Dummy',
default_args=DEFAULT_ARGS,
dagrun_timeout=timedelta(hours=1),
schedule_interval=None
)
dummy_task = BashOperator(
task_id='dummy_task',
bash_command='sleep 10',
dag=dummy_dag
)
我已经尝试通过多种方式调整此代码,但未通过 ExternalTaskSensor 获得任何成功。
有谁知道如何解决这个问题并使 ExternalTaskSensor 正常工作?我还读到,使用 ExternalTaskSensor 时,调度间隔可能会出现问题,问题的一部分是否可能是 DAG 都具有schedule_interval=None
?
我已经让它与两个设置为完全相同的 DAG 一起工作schedule_interval
,但这在生产中不起作用。目标是让主 DAG、external-watch-dag定期运行,并在运行期间触发该DAG-Dummy ,而DAG-Dummy本身具有schedule_interval=None
.
任何帮助是极大的赞赏。
解决方案
默认情况下,ExternalTaskSensor
将监视external_dag_id
与传感器 DAG 相同的执行日期。您可以设置传感器 dag 和外部 dag 之间的时间增量,execution_delta
以便它可以寻找正确execution_date
的监控。当两个 dag 都按计划运行时,这很有效,因为您确切地知道这个 timedelta。
问题:当手动或由另一个 dag 触发 dag 时,您无法确定这两个 dag 中任何一个的确切执行日期。
解决方法:因为你使用的是TriggerDagRunOperator
,所以可以设置execution_date
参数。这将确保您的 dag 和外部 dag 的执行日期相同。从文档:
execution_date (str or datetime.datetime) – dag 的执行日期(模板化)
因此,您的代码将如下所示:
trigger_external = TriggerDagRunOperator(
task_id='trigger_external',
trigger_dag_id='DAG-Dummy',
dag=external_watch_dag,
execution_date="{{ execution_date }}", # Use the template to get the current execution date
)
external_watch_op = ExternalTaskSensor(
task_id='external_watch_op',
external_dag_id='DAG-Dummy',
external_task_id='dummy_task',
check_existence=True,
execution_timeout=timedelta(minutes=30),
dag=external_watch_dag
)
推荐阅读
- git - 无法从 git 中的远程存储库推送或获取
- javascript - 如果语句不适用于 document.querySelector
- web-scraping - 刮 Highchart,缺少数据
- rust - 如何在 nom 中获取 N 位字节?
- python - 无法在命令行中运行 python 脚本 - urllib3.util.ssl_
- haskell - 处理结果的可能解决方案以“(RealFrac a,Integral a)=>a”结尾
- java - 在 Android Studio 中为通话记录添加搜索功能
- android - 基本 react-native 项目的运行错误
- python - 计算 Azure 数据湖 gen2 容器大小和读/写操作
- app-store - 列出 Apple Store 应用程序的所有应用程序内产品