首页 > 解决方案 > Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据

问题描述

首先,我使用 MySQL 查询从生产数据库中获取所有数据,然后将该数据NEW LINE DELIMITED JSON存储在谷歌云存储中,我想要做的是:
1. 检查表是否存在
2. 如果表不存在,使用创建表自动检测架构
3. 存储数据

所有这些都将安排在气流中。真正让我困惑的是数字2,我怎么能在 Python 中做到这一点?或者气流可以自动执行此操作吗?

标签: pythonmysqljsongoogle-bigqueryairflow

解决方案


Airflow 可以自动执行此操作。如果需要,该create_disposition参数会创建表。该autodetect参数完全符合您的需要。这是针对Airflow 1.10.2的。

GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
    task_id='gcs_to_bq',
    bucket='test_bucket',
    source_objects=['folder1/*.csv', 'folder2/*.csv'],
    destination_project_dataset_table='dest_table',
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_TRUNCATE',
    bigquery_conn_id='bq-conn',
    google_cloud_storage_conn_id='gcp-conn',
    autodetect=True, # This uses autodetect
    dag=dag
)

推荐阅读