google-bigquery - 如何重复 BigQueryOperator Dag 并将不同的日期传递给我的 sql 文件
问题描述
我有一个要使用 BigQueryOperator 运行的查询。每天,它将运行过去 21 天。sql 文件保持不变,但传递给文件的日期发生了变化。因此,例如今天它将针对今天的日期运行,然后针对昨天的日期重复,然后针对 2 天前重复,一直到 21 天前。所以它将在 2021 年 7 月 14 日运行,所以我需要将此日期传递给我的 sql 文件。然后它将运行 2021 年 7 月 13 日,我需要传递给我的 sql 文件的日期是 2021 年 7 月 13 日。我怎样才能让这个 dag 在一个日期范围内重复,并动态地将这个日期传递给 sql 文件。
在 BigQueryOperator 中,变量在“user_defined_macros, 部分中传递,所以我不知道如何更改我传递的日期。我想过循环一个日期数组,但我不知道如何传递那个日期到 BigQueryOperator 中链接的 sql 文件。
我的 sql 文件有 300 行长,所以我在下面提供了一个简单的示例,因为人们似乎想要一个。
有向无环图
with DAG(
dag_id,
schedule_interval='0 12 * * *',
start_date=datetime(2021, 1, 1),
template_searchpath='/opt/airflow/dags',
catchup=False,
user_defined_macros={"varsToPass":Var1
}
) as dag:
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE'
)
sql文件
SELECT * FROM table WHERE date = {{CHANGING_DATE}}
解决方案
您的代码令人困惑,因为您描述了一个重复的模式,today,today-1 day, ..., today - 21 days
但是您的代码显示write_disposition = 'WRITE_TRUNCATE'
这意味着只有最后一个查询很重要,因为每个查询都会删除前一个查询的结果。由于没有提供更多信息,我假设您实际上是指在今天到今天 - 21 天之间运行一个查询。此外,您没有提及您所指的日期是 Airflowexecution_date
还是今天的日期。
如果是execution_date
你不需要传递任何参数。SQL需要是:
SELECT * FROM table WHERE date BETWEEN {{ execution_date }} AND
{{ execution_date - macros.timedelta(days=21) }}
如果是今天,那么您需要使用参数传递参数:
from datetime import datetime
query_one = BigQueryOperator(
task_id='query_one',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table ='table',
write_disposition = 'WRITE_TRUNCATE',
params={
"end": datetime.utcnow().strftime('%Y-%m-%d'),
"start": (datetime.now() - datetime.timedelta(days=21)).strftime('%Y-%m-%d')
}
)
然后在 SQL 中,您可以将其用作:
SELECT * FROM table WHERE date BETWEEN {{ params.start }} AND
{{ params.end }}
我想指出,如果您不使用,execution_date
那么我看不到从 Airflow 传递日期的价值。您可以直接使用 BigQuery 将查询设置为:
SELECT *
FROM table
WHERE date BETWEEN DATE_SUB(current_date(), INTERVAL 21 DAY) AND current_date()
如果我的假设不正确并且您想要运行 21 个查询,那么您可以按照您的描述使用循环来执行此操作:
from datetime import datetime, timedelta
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
a = []
for i in range(0, 21):
a.append(
BigQueryOperator(
task_id=f'query_{i}',
sql='/sql/something.sql',
use_legacy_sql=False,
destination_dataset_table='table',
write_disposition='WRITE_TRUNCATE', # This is probably wrong, I just copied it from your code.
params={
"date_value": (datetime.now() - timedelta(days=i)).strftime('%Y-%m-%d')
}
)
)
if i not in [0]:
a[i - 1] >> a[i]
然后在您/sql/something.sql
的查询中应该是:
SELECT * FROM table WHERE date = {{ params.date_value }}
如前所述,这将创建一个工作流程:
另请注意,BigQueryOperator
已弃用。您应该BigQueryExecuteQueryOperator
通过 Google 提供商使用
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
有关如何安装 Google 提供程序的更多信息,请参阅以下答案的第二部分。
推荐阅读
- html - 如何防止 Vue 组件上的默认标题设置?
- vba - 使用 VBA 进行多标准选择
- advanced-custom-fields - 我无法在插件 ACF 中为 wordpress 回显该字段
- javascript - 如何在一行中的地图箭头函数中返回扩展运算符
- c# - 使用 ApplicationInsights 通过 trackevent 发送代理详细信息
- java - Spring:从 REST 控制器下载文件
- ubuntu - systemd 不适用于 dbus 服务
- javascript - 电子邮件 Javascript 验证消息
- docplex - 在本地运行 Mining_pandas.IPYNB
- android - 在 Recycler 上实现 TouchListener