首页 > 解决方案 > While Loop 执行 Airflow 运算符而不创建许多任务

问题描述

要求:使用 while 循环为每个日期运行 SQL 查询。我只是想从开始日期到结束日期迭代一个任务。例如:有分区表schema.xyz_20200901。首先我想运行任务来选择这个分区表并更新 schema.table_20200901 然后在下一次迭代中它将选择 schema.xyz_20200902 并更新 schema.table_20200902 等等。

问题:我不想每个分区日期都有一个任务,我怎么能只有一个任务来做呢?

在此处输入图像描述

while p< (len(BQ_result)):
BigQueryOperator(
        task_id= o_task_id + str (BQ_result['fulldate'][p]),
        bql=sql_torun.replace("_DATEPARTITION_", str (BQ_result['fulldate'][p])),
        destination_dataset_table=DESTINATION_TABLE_STG.format(
        bigQ_dataset = dest_dataset_stg,
        bigQ_table_destination = dest_table_stg,
        ),
        create_disposition=create_disposition,
        write_disposition=write_disposition,
        allow_large_results=True,
        bigquery_conn_id=bq_connection.get('connection_id'),
        use_legacy_sql=False,
        pool = BigQueryOperator_pool,
        dag=dag,
        task_concurrency=1
    )

标签: pythonairflow

解决方案


您的用例应该通过创建动态操作符来解决。但理想情况下,我建议您使用诸如python operator之类的运算符,它提供了更多的灵活性和对流程的控制。


推荐阅读