google-bigquery - 气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators
问题描述
我正在努力为 Airflow 上的以下问题找到解决方案:
我在 BQ 上有一张包含产品列表的表格(定期增加)。每个产品在 BigQuery/GoogleCloud 上都有不同的项目。比方说:
产品 | 身份证 | 项目 ID | PARAM_1 | PARAM_2
我目前在 Jenkins 上的管道使用 for 循环为每个产品构建并行 DAG,并且运行良好。
当我翻译为 Airflow DAG 时,我能够实现以下目标:
...
product_params = {
'Product1': {
'project_id': 'product-1',
'color': 'Blue'
},
'Product2': {
'project_id': 'product-2',
'color': 'Red'
},...
}
my_dag = DAG(
'My_Default_DAG',
schedule_interval='@once',
default_args=default_args
)
dag_tasks = {}
with firebase_dag as dag:
for product_name, p_params in product_params.items():
query_params = {
'product_name': product_name,
'product_project': product_params['project_id'],
'color': product_params['color'],
'event_date': '2019-12-01',
'event_date_suffix': '20191201'
}
dag_tasks[game] = {}
dag_tasks[game]['step_1'] = BigQueryOperator(
task_id="{0}_step_1".format(product_name),
bql='sql_folder/step-1.sql',
use_legacy_sql=False,
destination_dataset_table="{0}.prod_dataset.step1Table_{1}".format(product_params['project_id'], query_params['event_date_suffix']),
write_disposition='WRITE_TRUNCATE',
params=query_params
)
### following steps...
理想情况下,我想直接在 BigQuery 上查询我的产品参数。而且我已经在 bitbucket 上为它开发了一个 Python 库,还有一堆 Jenkins 广泛使用的其他方法。
有什么办法可以将该库导入气流并在我的 dags 中使用它?
否则,除了 BigQueryOperators 之外,还有其他方法可以构建与 bigquery 交互的方法吗?
解决方案
是的,您可以在 DAG 中使用您的库并将其与PythonOperator一起使用。
推荐阅读
- android - 如何过滤 Firebase 回收站视图?
- dataframe - 计算 ms spark sql 的差异
- javascript - 计算器 - 点击一个数字,相同的数字会显示不止一次
- python - Apache2 和 Django - [wsgi:error] ImportError: cannot import name 'get_version' -> No module named 'django'
- huawei-mobile-services - “分析”选项卡中缺少 AdvancedAnalytics
- html - 移动标题不改变颜色
- sql - 将 .db 文件上传到 Azure SQL
- arrays - 如何使用带或不带互斥锁的多个 pthread 访问和修改相同的内存块?
- angular - 从小吃店关闭对话框
- diameter-protocol - 我应该在 CCR-U 的授权服务单元中包括什么?所有评级组或报告的评级组或请求的服务?