首页 > 解决方案 > 如何为 MySqlToGoogleCloudStorageOperator 使用 field_to_bigquery(self, field) 函数

问题描述

我正在尝试将一些数据从 Mysql 导出到 GCS,然后将导出的 json 加载到 BigQuery 表中,但是我遇到了一些 mysql 数据类型(例如 BIT 和 DateTime)的问题,因为在 json 结果中它们出现的格式不是适合 BigQuery

我想知道是否有一种方法可以将这些值转换为适合 bigquery 的格式,而无需进行额外的转换过程。

在 MySqlToGoogleCloudStorageOperator 的气流文档中(https://airflow.apache.org/_api/airflow/contrib/operators/mysql_to_gcs/index.html#airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator)我可以看到一些可能有帮助的函数:

field_to_bigquery(self, field)convert_type(self, value, schema_type)

但我不知道如何在我的代码中实现这一点。

我尝试过这样的事情:

MySQLtoGCS_TransTBComplete = MySqlToGoogleCloudStorageOperator(
        task_id='import,
        mysql_conn_id='mysql_default',
        google_cloud_storage_conn_id='google_cloud_storage_default',
        sql='SELECT * FROM table' 
        bucket='{bucket}',
        filename='file.json',
        dag=dag)

MySQLtoGCS_TransTBComplete.field_to_bigquery("datetime_field")

任何人都可以帮助我实现我想要的吗?

谢谢

标签: google-cloud-platformairflow

解决方案


您不能在代码中专门使用这两个函数。

  • field_to_bigquery如果您schema_filename在运算符中设置,将使用

  • convert_type在任务执行期间始终使用(您可以在源代码中看到sql_to_gcs.py

正如文档所说,似乎 MySQL 中的某些字段类型处理不正确(请参见此处:https://airflow.apache.org/docs/stable/_modules/airflow/contrib/operators/mysql_to_gcs.html#MySqlToGoogleCloudStorageOperator。 field_to_bigquery)。因此,您可能必须实现自定义运算符继承 MySqlToGoogleCloudStorageOperator类并覆盖这两个方法。


推荐阅读