google-bigquery - 带有架构更改的 Airflow Pipeline CSV 到 BigQuery
问题描述
背景
我需要设计一个 Airflow 管道来将 CSV 加载到 BigQuery 中。
我知道 CSV 经常有一个不断变化的架构。加载第一个文件后,架构可能是
id | ps_1 | ps_1_value
当第二个文件登陆并加载它时,它可能看起来像
id | ps_1 | ps_1_value | ps_1 | ps_2_value
.
问题
处理此问题的最佳方法是什么?
我接近这个的第一个想法是
- 加载第二个文件
- 将架构与当前表进行比较
- 更新表,添加两列 (ps_2, ps_2_value)
- 插入新行
我会在 PythonOperator 中执行此操作。
如果文件 3 进入并且看起来id | ps_2 | ps_2_value
我会填写缺失的列并进行插入。
感谢您的反馈。
解决方案
加载两个先前的文件后example_data_1.csv
,example_data_2.csv
我可以看到这些字段被插入到正确的列中,并根据需要添加了新列。
编辑:灯泡时刻意识到schema_update_options
存在。见这里:https ://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.job.SchemaUpdateOption.html
csv_to_bigquery = GoogleCloudStorageToBigQueryOperator(
task_id='csv_to_bigquery',
google_cloud_storage_conn_id='google_cloud_default',
bucket=airflow_bucket,
source_objects=['data/example_data_3.csv'],
skip_leading_rows=1,
bigquery_conn_id='google_cloud_default',
destination_project_dataset_table='{}.{}.{}'.format(project, schema, table),
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
schema_update_options=['ALLOW_FIELD_RELAXATION', 'ALLOW_FIELD_ADDITION'],
autodetect=True,
dag=dag
)
推荐阅读
- c++ - 当基类中的虚拟不相关方法导致 MSVC 上的编译失败时,模板类方法实例化
- java - IntelliJ java.lang.ClassNotFoundException: com.mysql.jdbc.Driver 尽管在 IML 中有它
- angular - 将一项服务插入另一项服务时出错
- haskell - Haskell 中并行 N-Body 的性能不佳
- visual-studio - 需要复制现有 TFS 项目/解决方案的代码来创建新的 TFS 项目/解决方案
- javascript - 在Javascript的构造函数中访问不是用'this'关键字创建的变量
- java - Java Servelet 3.0 文件上传到输入流 - 无需创建中间文件夹或文件
- web-services - 在 ColdFusion 8 中无法使用 https Web 服务
- javascript - 使用 Ajax 用 php 结果更新 javascript 变量
- javascript - 在替代方案的情况下进行 Joi 验证