airflow - 如何在运行类似的依赖子任务集的气流中设置并行任务
问题描述
我想在气流 dag 中设置以下内容
---> Env1 Runner--->File1Runner--->File2Runner
main_process |
---> Env2 Runner--->File1Runner--->File2Runner
有一个主进程,它将为每个环境并行运行两个子进程。每个子进程将运行相同系列的相关任务。Env1 Runner 将通过 XCOM 为 env1 设置变量,Env2 Runner 将通过 XCOM 为 env2 设置变量。如何设置具有不同 XCOM 变量值的并行任务,但运行相同的函数集
解决方案
假设这些任务是PythonOperator
,我建议使用外部文件作为库,您可以在流程的不同步骤中从中导入。使用该结构,您可以使用不同的输入(作为这些函数的参数)实现所需的内容,并从任务中调用它们。
这是一个简单的例子:
# lib folder > helpers.py
def function_step_1(arg1, arg2, **kwargs):
# Do something
from_xcom = kwargs["ti"].xcom_pull(task_ids="initial_task", key=arg1)
# Do something with `from_xcom`
return "something_useful_1"
def function_step_2(arg3, arg4, **kwargs):
# Do something
return "something_useful_2"
def function_step_3(arg5, arg6, **kwargs):
# Do something
return "something_useful_3"
# DAG implementation under dags folder
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# This is where you import the defined functions
from lib.helpers import function_step_1, function_step_2, function_step_3
with DAG(
dag_id="dag_name",
default_args={}, # Change this
schedule_interval=None, # Change this
) as dag:
step_1 = PythonOperator(
task_id="step_1",
python_callable=function_step_1,
provide_context=True,
op_kwargs={
"arg1": "something1",
"arg2": "something2",
},
)
step_2 = PythonOperator(
task_id="step_2",
python_callable=function_step_2,
provide_context=True,
op_kwargs={
"arg3": "something3",
"arg4": "something4",
},
)
step_3 = PythonOperator(
task_id="step_3",
python_callable=function_step_3,
provide_context=True,
op_kwargs={
"arg5": "something5",
"arg6": "something6",
},
)
# You're going to add two of this starting with an initial task
step1 >> step2 >> step3
在函数function_step_1
中,您XCom
将从第一个任务开始处理。您可以使用一个参数来区分key
您将使用哪个XCom
(我提供了一个如何使用它的示例)。
推荐阅读
- localization - SCORM包(电子学习课程)本地化
- gnuplot - gnuplot 调色板随缩放改变颜色
- python - 如何计算盒子分类/包装问题中所需的队列数?
- javascript - 解决方案/黑客向浏览器呈现非关闭标签
- python - 使用太多运算符验证后缀
- eclipse - 将 selenium webdriver 代码推送到 Bitbucket 时出现授权错误
- javascript - 如果动态按钮发生变化,如何从动态按钮获取数据属性
- python - 使用 anaconda 安装后如何访问库链接?
- javascript - 如何在此函数中包含 window.load 或 $( document ).ready() ?
- python - 如何限制keras范围内的权重