airflow - AirFlow DAG 卡在运行状态
问题描述
我创建了一个 dag 并每天安排它。它每天都在排队,但任务实际上并没有运行。过去这里已经提出了这个问题,但答案对我没有帮助,所以似乎还有另一个问题。
我的代码在下面共享。我用注释替换了任务 t2 的 SQL。当我使用“气流测试...”在 CLI 上单独运行每个任务时,每个任务都会成功运行。
你能解释一下应该做些什么来让 DAG 运行吗?谢谢!
这是 DAG 代码:
from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}
dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)
t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}'
''',
dag=dag)
t2 = BigQueryOperator(
task_id='bq_insert_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
bql='''
#standardSQL
Select ... the query continue here.....
''', destination_dataset_table='my_project.agg.my_agg_table',
dag=dag)
t1 >> t2
解决方案
通常很容易找出任务未运行的原因。在 Airflow 网页界面中时:
- 选择任何感兴趣的 DAG
- 现在点击任务
- 再次点击
Task Instance Details
- 在第一行有一个面板
Task Instance State
- 在旁边的框中
Reason
是运行任务的原因 - 或忽略任务的原因
检查第一个没有执行的任务通常是有意义的,因为我看到你有设置depends_on_past=True
,如果在错误的场景中使用可能会导致问题。
更多信息在这里:Airflow 1.9.0 正在排队但不启动任务
推荐阅读
- terraform - 在 openstack 上使用 terraform 旋转不同图像的多个实例
- c# - 如果任何属性为空,如何在匿名类中设置默认值;
- python - 是否可以根据 geopandas 中的属性添加不同的缓冲区?
- php - 用一个查询php更新多行
- reactjs - react-navigation v5 中不显示图标
- c++ - CPU瓶颈;处理具有许多非静态对象的 3D 场景渲染的简单方法
- javascript - 导航返回时无法在单选按钮之间导航
- python - 使用 Dataframe 中的列表理解检测和更改
- c# - 使用 WebClient (.Net 4.0) 在 Web API 上执行 PATCH
- python - 使用 selenium Python 库单击下拉菜单中的项目