首页 > 解决方案 > 气流与 DAG 外部的 BigQuery 交互,而不使用 BigQueryOperators

问题描述

我正在努力为 Airflow 上的以下问题找到解决方案:

我在 BQ 上有一张包含产品列表的表格(定期增加)。每个产品在 BigQuery/GoogleCloud 上都有不同的项目。比方说:

产品 | 身份证 | 项目 ID | PARAM_1 | PARAM_2

我目前在 Jenkins 上的管道使用 for 循环为每个产品构建并行 DAG,并且运行良好。

当我翻译为 Airflow DAG 时,我能够实现以下目标:

...
product_params = {
    'Product1': {
        'project_id': 'product-1',
        'color': 'Blue'
    },
    'Product2': {
        'project_id': 'product-2',
        'color': 'Red'
    },...
}


my_dag = DAG(
    'My_Default_DAG',
    schedule_interval='@once',
    default_args=default_args
    )

dag_tasks = {}

with firebase_dag as dag:
    for product_name, p_params in product_params.items():
        query_params = {
            'product_name': product_name,
            'product_project': product_params['project_id'],
            'color': product_params['color'],
            'event_date': '2019-12-01',
            'event_date_suffix': '20191201'
        }

        dag_tasks[game] = {}

        dag_tasks[game]['step_1'] = BigQueryOperator(
                task_id="{0}_step_1".format(product_name),
                bql='sql_folder/step-1.sql',
                use_legacy_sql=False,
                destination_dataset_table="{0}.prod_dataset.step1Table_{1}".format(product_params['project_id'], query_params['event_date_suffix']),
                write_disposition='WRITE_TRUNCATE',
                params=query_params
            )
       ### following steps...

理想情况下,我想直接在 BigQuery 上查询我的产品参数。而且我已经在 bitbucket 上为它开发了一个 Python 库,还有一堆 Jenkins 广泛使用的其他方法。

有什么办法可以将该库导入气流并在我的 dags 中使用它?

否则,除了 BigQueryOperators 之外,还有其他方法可以构建与 bigquery 交互的方法吗?

标签: google-bigqueryairflowdirected-acyclic-graphs

解决方案


是的,您可以在 DAG 中使用您的库并将其与PythonOperator一起使用。


推荐阅读