首页 > 解决方案 > 如何在运行类似的依赖子任务集的气流中设置并行任务

问题描述

我想在气流 dag 中设置以下内容

               ---> Env1 Runner--->File1Runner--->File2Runner
main_process |
               ---> Env2 Runner--->File1Runner--->File2Runner
         

有一个主进程,它将为每个环境并行运行两个子进程。每个子进程将运行相同系列的相关任务。Env1 Runner 将通过 XCOM 为 env1 设置变量,Env2 Runner 将通过 XCOM 为 env2 设置变量。如何设置具有不同 XCOM 变量值的并行任务,但运行相同的函数集

标签: airflow

解决方案


假设这些任务是PythonOperator,我建议使用外部文件作为库,您可以在流程的不同步骤中从中导入。使用该结构,您可以使用不同的输入(作为这些函数的参数)实现所需的内容,并从任务中调用它们。

这是一个简单的例子:

# lib folder > helpers.py
def function_step_1(arg1, arg2, **kwargs):
    # Do something
    from_xcom = kwargs["ti"].xcom_pull(task_ids="initial_task", key=arg1)
    # Do something with `from_xcom`
    return "something_useful_1"


def function_step_2(arg3, arg4, **kwargs):
    # Do something
    return "something_useful_2"


def function_step_3(arg5, arg6, **kwargs):
    # Do something
    return "something_useful_3"

# DAG implementation under dags folder
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# This is where you import the defined functions
from lib.helpers import function_step_1, function_step_2, function_step_3

with DAG(
    dag_id="dag_name",
    default_args={},  # Change this
    schedule_interval=None,  # Change this
) as dag:
    step_1 = PythonOperator(
        task_id="step_1",
        python_callable=function_step_1,
        provide_context=True,
        op_kwargs={
            "arg1": "something1",
            "arg2": "something2",
        },
    )

    step_2 = PythonOperator(
        task_id="step_2",
        python_callable=function_step_2,
        provide_context=True,
        op_kwargs={
            "arg3": "something3",
            "arg4": "something4",
        },
    )

    step_3 = PythonOperator(
        task_id="step_3",
        python_callable=function_step_3,
        provide_context=True,
        op_kwargs={
            "arg5": "something5",
            "arg6": "something6",
        },
    )

    # You're going to add two of this starting with an initial task
    step1 >> step2 >> step3

在函数function_step_1中,您XCom将从第一个任务开始处理。您可以使用一个参数来区分key您将使用哪个XCom(我提供了一个如何使用它的示例)。


推荐阅读