airflow - Airflow - 在同一个 DAG 中使用 TaskGroup 和 PythonBranchOperator
问题描述
我目前正在使用 Airflow Taskflow API 2.0。我遇到了结合使用 TaskGroup 和 BranchPythonOperator 的问题。
下面是我的代码:
import airflow
from airflow.models import DAG
from airflow.decorators import task, dag
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator, PythonOperator
from airflow.operators.python import task, get_current_context
from random import randint
from airflow.utils.task_group import TaskGroup
default_args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
@task
def dummy_task():
return {}
@task
def task_b():
return {}
@task
def task_c():
return {}
def final_step():
return {}
def get_tasks(**kwargs):
task = 'task_a'
return task
with DAG(dag_id='branch_dag',
default_args=default_args,
schedule_interval=None) as dag:
with TaskGroup('task_a') as task_a:
obj = dummy_task()
tasks = BranchPythonOperator(
task_id='check_api',
python_callable=get_tasks,
provide_context=True
)
final_step = PythonOperator(
task_id='final_step',
python_callable=final_step,
trigger_rule='one_success'
)
b = task_b()
c = task_c()
tasks >> task_a >> final_step
tasks >> b >> final_step
tasks >> c >> final_step
当我触发此 DAG 时,我在 check_api 任务中收到以下错误:
airflow.exceptions.TaskNotFound:任务task_a未找到
是否可以将 TaskGroup 与 BranchPythonOperator 结合使用?
谢谢,
解决方案
推荐阅读
- machine-learning - 我可以先对同一数据集应用“分类”,然后再应用“回归”吗?
- reactjs - OverlayTrigger - findDOMNode 在 StrictMode 中已弃用
- c# - OnRoomListUpdate 不工作,但 OnCreatedRoom 工作正常
- php - PHP 当重写页面 urls 如何删除或重定向 category.php?cat_id=2 urls 到重写的 urls?
- plotly-dash - 使用 html.Script 将 Dash (Plotly) 连接到 API
- c# - 类型或命名空间名称不存在。但仅在构建服务器上
- eventemitter - 为什么 vuejs3 应用程序中的发射器不捕获事件?
- mathjax - 如何在 MathJax 中正确渲染带有变音符号的字母?
- swift - 如何制作条形图?迅速
- java - 使用 fromRequest 的 Spring Cloud Contracts 算术函数