python - 如何使用 While Loop 执行 Airflow 算子
问题描述
要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行为 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 8 月 28 日。
问题:在下面的 DAG 中,它只执行开始日期查询,然后完成作业。它甚至不执行/迭代 BigQueryOperator 到下一个日期等等。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import date, datetime, timedelta
import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime(2018, 8, 31),
'email': ['xyz@xyz.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=10),
'depends_on_past': False
}
dag = DAG('his_temp',default_args=default_args,schedule_interval=None)
date1 = datetime.date(2018, 8, 25)
date2 = datetime.date(2018, 8, 28)
day = datetime.timedelta(days=1)
while date1 <= date2:
parameter = {
'dataset': "projectname.finance",
'historical_date': date1.strftime('%Y%m%d')
}
sqlpartition = BigQueryOperator(
task_id='execute_sqlpartition',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql="sqlqueries/sqlpartition.sql",
destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),
params=parameter,
bigquery_conn_id='bigquery',
dag=dag)
print "data loaded for "+ parameter.get('historical_date')
date1 = date1 + day
解决方案
You can add a self triggering operator at the end of your dependancies. Something like the following:
def trigger_check(context, dag_run_obj):
if date1 <= date2:
return dag_run_obj
trigger = TriggerDagRunOperator(
task_id="test_trigger_dagrun",
trigger_dag_id="his_temp",
python_callable=trigger_check,
... more arguments
)
op1 >> op2 >> ... >> trigger
Once you trigger it a first time, it will loop through dates until it reaches the date2 threshold. You'll have to be more careful with updating the date by making that an ordered PythonOperator or something like that
推荐阅读
- php - 强制wordpress页面刷新
- python - 如何用 Python 做一个合适的词搜索器?
- google-chrome - WebDriverException:未知错误:尝试启动 Chrome 浏览器时 DevToolsActivePort 文件不存在
- ios - iPhone X UITableViewController 带有固定底栏显示不透明工具栏下方的表格内容
- akka-stream - akka 流如何处理它自己产生源的多个源
- data-structures - 二维加速结构
- javascript - 如何在 HTML 中使用导出
- c# - C# 为所有其他类更改一个类的字段的值
- vba - 电子表格中的运行时错误
- sql - 将变量传递到 sp_helptext