首页 > 解决方案 > Airflow:有没有办法在 dag 之外将操作员分组?

问题描述

有没有办法设计一个在 dag 之外实现特定数据管道模式的 python 类,以便将此类用于需要此模式的所有数据管道?

示例:为了将数据从 Google Cloud Storage 加载到 Big Query,该过程可以是通过数据质量测试来验证摄取候选文件。然后尝试在 Big Query 中的原始表中加载数据,然后根据加载结果将文件分派到存档中或被拒绝的文件夹中。

做一次很容易,如果需要做1000次怎么办?我想弄清楚如何优化工程时间。

可以考虑使用 SubDag,但它在性能方面存在局限性,并且无论如何都会被弃用。

任务组需要成为要实施的 dag 的一部分https://github.com/apache/airflow/blob/1be3ef635fab635f741b775c52e0da7fe0871567/airflow/utils/task_group.py#L35

实现预期行为的一种方法可能是从利用动态 DAGing 的单个 python 文件生成 dag、任务组和任务

然而,在这个特定文件中使用的代码不能在代码库的某个地方重用。尽管 DRYness 与可理解性始终是一个权衡,但它反对 DRYness。

标签: pythonairflowgoogle-cloud-composer

解决方案


基于这篇文章

以下是如何解决这个问题:

您可以在气流中定义一个插件 ./plugins 让我们在 ./plugins/test_taskgroup.py 中创建一个示例任务组

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup

def hello_world_py():
        print('Hello World')

def build_taskgroup(dag: DAG) -> TaskGroup:
    
    with TaskGroup(group_id="taskgroup") as taskgroup:
        dummy_task = DummyOperator(
            task_id="dummy_task",
            dag=dag
        )
        python_task = PythonOperator(
            task_id="python_task",
            python_callable=hello_world_py,
            dag=dag
        )

    dummy_task >> python_task
    return taskgroup

您可以像这样在一个简单的 python DAG 中调用它:

from airflow.utils import task_group
from test_plugin import build_taskgroup
from airflow import DAG


with DAG(
    dag_id="modularized_dag",
    schedule_interval="@once",
    start_date=datetime(2021, 1, 1),
) as dag:

    task_group = build_taskgroup(dag)

这是结果 在此处输入图像描述


推荐阅读