首页 > 解决方案 > 一个 BigQueryOperator 的模板化 destination_dataset_table arg 作为另一个模板化的 From

问题描述

我试图在 ETL 管道中将一堆 BigQuery SQL 命令链接在一起,其中一些输出和输入将被加上时间戳。

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

DAG_NAME = 'foo'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(7),
    'email': ['xxx@xxx.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
}

dag = DAG(
    dag_id="blah",
    default_args=default_args,
    schedule_interval=None,
    template_searchpath=["/usr/local/airflow/dags/xxx/sql"])


GOOGLE_PROJECT_ID = 'xxx'
DATASET_ID = 'xxx'
first_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "first_output_" + '{{ ds_nodash }}'
second_output = GOOGLE_PROJECT_ID + ":" + DATASET_ID + "." + "second_output"
GOOGLE_CLOUD_PLATFORM_CONNECTION_ID="google_cloud_default"


first_op = BigQueryOperator(
    task_id='first_output',
    dag=dag,
    bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
    bql="XXX.sql",
    use_legacy_sql=True,
    allow_large_results=True,
    destination_dataset_table=first_output # {{ ds }} gets substituted because destination_dataset_table is a templated field
)

second_op = BigQueryOperator(
    task_id='second_op',
    dag=dag,
    bigquery_conn_id=GOOGLE_CLOUD_PLATFORM_CONNECTION_ID,
    bql="XXX_two.sql", # XXX_two.sql contains a {{ params.input_table }} reference
    params={'input_table': first_op.destination_dataset_table},
    use_legacy_sql=True,
    allow_large_results=True,
    destination_dataset_table=second_output

)

second_op.set_upstream(first_op)

XXX_two.sql的内容:

SELECT * FROM [{{ params.input_table }}

测试通过:

airflow test blah second_op  2015-06-01

我当前的错误是(也在生产中)

Exception: BigQuery job failed. Final error was: {'reason': 'invalid', 'location': BLAH, 'message': 'Invalid table name: xxx:xx.first_output_{{ ds_nodash }}'}. 

如何在操作员执行之外访问模板化字段?

标签: google-bigqueryairflow

解决方案


该字段肯定是模板化的,从源代码destination_dataset_table中可以看出(1.9,没有提供版本,所以我采用了最新的):

template_fields = ('bql', 'destination_dataset_table')

我会将创建字符串更改为:

first_output = "[{project}:{dataset}.first_output_{{{{ ds_nodash }}}}]".format(
    project=GOOGLE_PROJECT_ID,
    dataset=DATASET_ID)

四个花括号应该变成两个,结果字符串应该看起来像

[my_project:my_dataset.first_output_{{ ds_nodash }}]

现在ds_nodash在使用时应该被解析destination_dataset_table

请注意,我还[ ]为遗留语句添加了所需的括号。我不确定这是否也可能与缺少的括号有关。

编辑

正如@mask 正确指出的那样,您使用的是我一开始没有看到的first_op字符串second_op params

由于以下原因,这不起作用:

  • first_op 不应该提供字符串,但您应该使用first_output- 我仍然想知道为什么这首先会起作用
  • 如果您从任务中提取字符串,您将不会获得呈现的字符串,但始终是原始的模板字符串 *如果您不确定字段已被处理(如 Mask 所述)
  • params根本没有模板化,因此不会正确更新

这些是我能想到的解决方案:

  • 派生您自己的BigDataOperator并添加params到模板化字段(如果可行,它是一个字典)
  • 或者扩展 thexxx_two.sql使其不会使用params.input_table但也first_output. 由于您希望first_output在模板中可用,您必须首先将其添加到 DAG 参数user_defined_macros中。

要了解有关这些解决方案的更多信息,请查看以下相关问题:Make custom Airflow macros expand other macros


推荐阅读