首页 > 解决方案 > 如何使用 While Loop 执行 Airflow 算子

问题描述

要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行为 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 8 月 28 日。

问题:在下面的 DAG 中,它只执行开始日期查询,然后完成作业。它甚至不执行/迭代 BigQueryOperator 到下一个日期等等。

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import date, datetime, timedelta
import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime.datetime(2018, 8, 31),
    'email': ['xyz@xyz.com'],
    'email_on_failure': True,
    'retries': 1,
    'retry_delay': timedelta(minutes=10),
    'depends_on_past': False
}

dag = DAG('his_temp',default_args=default_args,schedule_interval=None)

date1 = datetime.date(2018, 8, 25)
date2 = datetime.date(2018, 8, 28)
day = datetime.timedelta(days=1)

while date1 <= date2:
    parameter = {
        'dataset': "projectname.finance",
        'historical_date': date1.strftime('%Y%m%d')
    }


    sqlpartition = BigQueryOperator(
    task_id='execute_sqlpartition',
    use_legacy_sql=False,
    write_disposition='WRITE_TRUNCATE',
    allow_large_results=True,
    bql="sqlqueries/sqlpartition.sql",
    destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),
    params=parameter,
    bigquery_conn_id='bigquery',
    dag=dag)

    print "data loaded for "+ parameter.get('historical_date')

    date1 = date1 + day   

标签: pythonairflow

解决方案


You can add a self triggering operator at the end of your dependancies. Something like the following:

def trigger_check(context, dag_run_obj):
    if date1 <= date2:
        return dag_run_obj
trigger = TriggerDagRunOperator(
    task_id="test_trigger_dagrun",
    trigger_dag_id="his_temp",
    python_callable=trigger_check,
    ... more arguments
)
op1 >> op2 >> ... >> trigger

Once you trigger it a first time, it will loop through dates until it reaches the date2 threshold. You'll have to be more careful with updating the date by making that an ordered PythonOperator or something like that


推荐阅读