python - Bigquery:如果不存在则创建表并使用 Python 和 Apache AirFlow 加载数据
问题描述
首先,我使用 MySQL 查询从生产数据库中获取所有数据,然后将该数据NEW LINE DELIMITED JSON
存储在谷歌云存储中,我想要做的是:
1. 检查表是否存在
2. 如果表不存在,使用创建表自动检测架构
3. 存储数据
所有这些都将安排在气流中。真正让我困惑的是数字2
,我怎么能在 Python 中做到这一点?或者气流可以自动执行此操作吗?
解决方案
Airflow 可以自动执行此操作。如果需要,该create_disposition
参数会创建表。该autodetect
参数完全符合您的需要。这是针对Airflow 1.10.2的。
GCS_to_BQ = GoogleCloudStorageToBigQueryOperator(
task_id='gcs_to_bq',
bucket='test_bucket',
source_objects=['folder1/*.csv', 'folder2/*.csv'],
destination_project_dataset_table='dest_table',
source_format='CSV',
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_TRUNCATE',
bigquery_conn_id='bq-conn',
google_cloud_storage_conn_id='gcp-conn',
autodetect=True, # This uses autodetect
dag=dag
)
推荐阅读
- rust - 如何在不触发生命周期问题且不使用拥有值的情况下避免 E0637 在特征绑定中?
- python - Raspbian Buster - 我使用的是哪个 Python 版本?
- spring - SPRING MVC:在一个方法中定义的 HttpSession 对另一种方法不可用
- swift - SwiftUI:在列表中时始终激活 NavigationLink
- json - JSON 中的动态未知字段
- flutter - Flutter 缓存管理器:清除特定 url 的缓存
- python - 使用 Python 三元条件运算符执行多个操作
- xml - 将没有域的名称提取到新的 xpath 字段中
- wso2 - 如何解决从 WSO2 API Manager Store 执行 HTTPS 调用的 101500 错误消息?我要设置证书吗?
- javascript - 如何在laravel中达到条件后禁用按钮