首页 > 解决方案 > 如何重复 BigQueryOperator Dag 并将不同的日期传递给我的 sql 文件

问题描述

我有一个要使用 BigQueryOperator 运行的查询。每天,它将运行过去 21 天。sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天它将针对今天的日期运行,然后针对昨天的日期重复,然后针对 2 天前重复,一直到 21 天前。所以它将在 2021 年 7 月 14 日运行,所以我需要将此日期传递给我的 sql 文件。然后它将运行 2021 年 7 月 13 日,我需要传递给我的 sql 文件的日期是 2021 年 7 月 13 日。我怎样才能让这个 dag 在一个日期范围内重复,并动态地将这个日期传递给 sql 文件。

在 BigQueryOperator 中,变量在“user_defined_macros, 部分中传递,所以我不知道如何更改我传递的日期。我想过循环一个日期数组,但我不知道如何传递那个日期到 BigQueryOperator 中链接的 sql 文件。

我的 sql 文件有 300 行长,所以我在下面提供了一个简单的示例,因为人们似乎想要一个。

有向无环图

with DAG(
    dag_id,
    schedule_interval='0 12 * * *',
    start_date=datetime(2021, 1, 1),
    template_searchpath='/opt/airflow/dags',
    catchup=False,
    user_defined_macros={"varsToPass":Var1
    }

) as dag:
    query_one = BigQueryOperator(
        task_id='query_one',
        sql='/sql/something.sql',
        use_legacy_sql=False,
        destination_dataset_table ='table',
        write_disposition = 'WRITE_TRUNCATE'
        
    )

sql文件

SELECT * FROM table WHERE date = {{CHANGING_DATE}}

标签: google-bigqueryairflow

解决方案


您的代码令人困惑,因为您描述了一个重复的模式,today,today-1 day, ..., today - 21 days但是您的代码显示write_disposition = 'WRITE_TRUNCATE'这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是指在今天到今天 - 21 天之间运行一个查询。此外,您没有提及您所指的日期是 Airflowexecution_date还是今天的日期。

如果是execution_date你不需要传递任何参数。SQL需要是:

SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}

如果是今天,那么您需要使用参数传递参数:

from datetime import datetime
query_one = BigQueryOperator(
    task_id='query_one',
    sql='/sql/something.sql',
    use_legacy_sql=False,
    destination_dataset_table ='table',
    write_disposition = 'WRITE_TRUNCATE',
    params={
            "end": datetime.utcnow().strftime('%Y-%m-%d'),
            "start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
    }
    
)

然后在 SQL 中,您可以将其用作:

SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}

我想指出,如果您不使用,execution_date那么我看不到从 Airflow 传递日期的价值。您可以直接使用 BigQuery 将查询设置为:

SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()

如果我的假设不正确并且您想要运行 21 个查询,那么您可以按照您的描述使用循环来执行此操作:

from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
    a.append(
        BigQueryOperator(
            task_id=f'query_{i}',
            sql='/sql/something.sql',
            use_legacy_sql=False,
            destination_dataset_table='table',
            write_disposition='WRITE_TRUNCATE',  # This is probably wrong, I just copied it from your code.
            params={
                "date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
            }
        )

    )
    if i not in [0]:
        a[i - 1] >> a[i]

然后在您/sql/something.sql的查询中应该是:

SELECT * FROM table WHERE date = {{ params.date_value }}

如前所述,这将创建一个工作流程:

在此处输入图像描述

另请注意,BigQueryOperator已弃用。您应该BigQueryExecuteQueryOperator通过 Google 提供商使用

from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator

有关如何安装 Google 提供程序的更多信息,请参阅以下答案的第二部分。


推荐阅读