首页 > 解决方案 > 带有架构更改的 Airflow Pipeline CSV 到 BigQuery

问题描述

背景

我需要设计一个 Airflow 管道来将 CSV 加载到 BigQuery 中。

我知道 CSV 经常有一个不断变化的架构。加载第一个文件后,架构可能是

id | ps_1 | ps_1_value

当第二个文件登陆并加载它时,它可能看起来像

id | ps_1 | ps_1_value | ps_1 | ps_2_value.

问题

处理此问题的最佳方法是什么?


我接近这个的第一个想法是

  1. 加载第二个文件
  2. 将架构与当前表进行比较
  3. 更新表,添加两列 (ps_2, ps_2_value)
  4. 插入新行

我会在 PythonOperator 中执行此操作。

如果文件 3 进入并且看起来id | ps_2 | ps_2_value我会填写缺失的列并进行插入。

感谢您的反馈。

标签: google-bigqueryairflow

解决方案


加载两个先前的文件后example_data_1.csvexample_data_2.csv我可以看到这些字段被插入到正确的列中,并根据需要添加了新列。

编辑:灯泡时刻意识到schema_update_options存在。见这里:https ://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html

csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
    task_id='csv_to_bigquery',
    google_cloud_storage_conn_id='google_cloud_default',
    bucket=airflow_bucket,
    source_objects=['data/example_data_3.csv'],
    skip_leading_rows=1,
    bigquery_conn_id='google_cloud_default',    
    destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
    source_format='CSV',
    create_disposition='CREATE_IF_NEEDED',
    write_disposition='WRITE_APPEND',
    schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
    autodetect=True,
    dag=dag
)

在此处输入图像描述


推荐阅读