python - 我们如何使用 BigQueryCreateEmptyTableOperator 创建具有“Clustered by”列的表?
问题描述
我正在尝试使用 python 脚本在我的 GCP 作曲家任务中使用 BigQueryCreateEmptyTableOperator 和 cluster_by 列创建一个表。我正在使用“cluster_fields”来创建列,但它不起作用。请建议我正确的方法是什么?
下面是我正在使用的代码。
stop_op = BigQueryCreateEmptyTableOperator(
task_id='BigQuery_CreateTable',
dataset_id=dataset_nm,
table_id=table_nm,
project_id=project_nm,
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "distribution_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "transaction_date", "type": "DATE", "mode": "NULLABLE"}],
time_partitioning={'type': 'DAY', 'field': 'transaction_date'},
cluster_fields='distribution_name',
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
autodetect=True,
dag=dag
)
解决方案
目前,最新版本的 Airflow 中不提供此功能(编写此答案时为 1.10.5)。
但是,您可以使用如下创建一个新运算符并使用它。
from airflow.utils.decorators import apply_defaults
from airflow.contrib.hooks.bigquery_hook import BigQueryHook
from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook, _parse_gcs_url
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
import json
class BQCreateEmptyTableWithClusteredFieldsOp(BigQueryCreateEmptyTableOperator):
template_fields = ('dataset_id', 'table_id', 'project_id',
'gcs_schema_object', 'labels')
ui_color = '#f0eee4'
# pylint: disable=too-many-arguments
@apply_defaults
def __init__(self,
dataset_id,
table_id,
project_id=None,
schema_fields=None,
gcs_schema_object=None,
time_partitioning=None,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='google_cloud_default',
delegate_to=None,
labels=None,
encryption_configuration=None,
cluster_fields=None,
*args, **kwargs):
super(BigQueryCreateEmptyTableOperator, self).__init__(*args, **kwargs)
self.project_id = project_id
self.dataset_id = dataset_id
self.table_id = table_id
self.schema_fields = schema_fields
self.gcs_schema_object = gcs_schema_object
self.bigquery_conn_id = bigquery_conn_id
self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
self.delegate_to = delegate_to
self.time_partitioning = {} if time_partitioning is None else time_partitioning
self.labels = labels
self.encryption_configuration = encryption_configuration
self.cluster_fields = cluster_fields or []
def execute(self, context):
bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
delegate_to=self.delegate_to)
if not self.schema_fields and self.gcs_schema_object:
gcs_bucket, gcs_object = _parse_gcs_url(self.gcs_schema_object)
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
gcs_bucket,
gcs_object).decode("utf-8"))
else:
schema_fields = self.schema_fields
conn = bq_hook.get_conn()
cursor = conn.cursor()
cursor.create_empty_table(
project_id=self.project_id,
dataset_id=self.dataset_id,
table_id=self.table_id,
schema_fields=schema_fields,
time_partitioning=self.time_partitioning,
labels=self.labels,
cluster_fields=self.cluster_fields,
encryption_configuration=self.encryption_configuration
)
现在您可以按如下方式使用它:
stop_op = BQCreateEmptyTableWithClusteredFieldsOp(
task_id='BigQuery_CreateTable',
dataset_id=dataset_nm,
table_id=table_nm,
project_id=project_nm,
schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
{"name": "distribution_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "transaction_date", "type": "DATE", "mode": "NULLABLE"}],
time_partitioning={'type': 'DAY', 'field': 'transaction_date'},
cluster_fields='distribution_name',
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
autodetect=True,
dag=dag
)
推荐阅读
- python - Django,覆盖url路由的静态路径
- java - 如何在 Eclipse 中使用 gson 将 ResultSet Query 转换为 JSON?
- python - 有没有比使用 .drop() 更好的方法通过从 DataFrame 中删除几列来保留几列?
- python - 为什么我不能在 Pycharm 上安装 pygame?
- java - Java 在评估表达式时的自动类型转换
- sql-server - 数据库项目中的 SQL Server 存储过程错误 [Visual Studio]
- python - 使用 Pandas 删除特定行时出错
- mysql - Magento 2 - 违反完整性约束:1062 重复条目 - 设置错误:从社区升级到企业后升级
- android - Android - 从代码更改微调器的字体大小
- python - 发布到 Google Analytics,在 Node 上工作,在 Python 上失败......为什么?