google-bigquery - 一个 BigQueryOperator 的模板化 destination_dataset_table arg 作为另一个模板化的 From
问题描述
我试图在 ETL 管道中将一堆 BigQuery SQL 命令链接在一起,其中一些输出和输入将被加上时间戳。
from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
DAG_NAME = 'foo'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(7),
'email': ['xxx@xxx.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
dag_id="blah",
default_args=default_args,
schedule_interval=None,
template_searchpath=["/usr/local/airflow/dags/xxx/sql"])
GOOGLE_PROJECT_ID = 'xxx'
DATASET_ID = 'xxx'
first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"
first_op = BigQueryOperator(
task_id='first_output',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX.sql",
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
)
second_op = BigQueryOperator(
task_id='second_op',
dag=dag,
bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
params={'input_table': first_op.destination_dataset_table},
use_legacy_sql=True,
allow_large_results=True,
destination_dataset_table=second_output
)
second_op.set_upstream(first_op)
XXX_two.sql的内容:
SELECT * FROM [{{ params.input_table }}
测试通过:
airflow test blah second_op 2015-06-01
我当前的错误是(也在生产中)
Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}.
如何在操作员执行之外访问模板化字段?
解决方案
该字段肯定是模板化的,从源代码destination_dataset_table
中可以看出(1.9,没有提供版本,所以我采用了最新的):
template_fields = ('bql', 'destination_dataset_table')
我会将创建字符串更改为:
first_output = "[{project}:{dataset}.first_output_{{{{ ds_nodash }}}}]".format(
project=GOOGLE_PROJECT_ID,
dataset=DATASET_ID)
四个花括号应该变成两个,结果字符串应该看起来像
[my_project:my_dataset.first_output_{{ ds_nodash }}]
现在ds_nodash
在使用时应该被解析destination_dataset_table
。
请注意,我还[ ]
为遗留语句添加了所需的括号。我不确定这是否也可能与缺少的括号有关。
编辑
正如@mask 正确指出的那样,您使用的是我一开始没有看到的first_op
字符串second_op
params
。
由于以下原因,这不起作用:
- first_op 不应该提供字符串,但您应该使用
first_output
- 我仍然想知道为什么这首先会起作用 - 如果您从任务中提取字符串,您将不会获得呈现的字符串,但始终是原始的模板字符串 *如果您不确定字段已被处理(如 Mask 所述)
params
根本没有模板化,因此不会正确更新
这些是我能想到的解决方案:
- 派生您自己的
BigDataOperator
并添加params
到模板化字段(如果可行,它是一个字典) - 或者扩展 the
xxx_two.sql
使其不会使用params.input_table
但也first_output
. 由于您希望first_output
在模板中可用,您必须首先将其添加到 DAG 参数user_defined_macros
中。
要了解有关这些解决方案的更多信息,请查看以下相关问题:Make custom Airflow macros expand other macros
推荐阅读
- vue.js - 在 VUE 中部署到服务器后,自定义本地字体不起作用
- kotlin - Kotlin 代码 - FlatMap 如何在这里工作?
- javascript - 如何使用 django 从服务器向客户端发送错误消息并将其打印到控制台?
- rust - 如何使用 spsheet carate 提取单元格的值
- python - 如何使用 matplotlib.pyplot 将不同的函数应用于同一个图?
- mysql - MySQL C API - 批量插入性能问题
- object-detection-api - `RuntimeError: Failed to allocate tensors` 在 Google Coral 上使用重新训练的对象检测模型时
- javascript - 如何获取 html 输入并使用外部 javascript 显示它
- c# - 如何在 Unity3D 中一一获取获取请求结果?
- javascript - 如何引用 Javascript 数组中的变量?