首页 > 解决方案 > 当任务名称和任务 ID 不匹配时,为什么气流会抛出异常?

问题描述

从airflow的文档和阅读互联网上的不同站点可以看出,在使用算子创建任务时,任务名称和task_id不需要匹配。例如,我正在使用以下代码:

from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python_operator import PythonOperator
from pprint import pprint

default_args = {
    'owner':'me',
    'start_date': timezone.datetime(2019,1,1), 
    'provide_context': True
 }

dag = DAG(
    'etl', # dag_id
    default_args=default_args,
    schedule_interval=None,
    max_active_runs=2,
    catchup=False
)

def some_function(**context):
    start_time = default_args['start_date']
    print(start_time )

def another_function(**context):
    start_time = default_args['start_date']
    print(start_time )

with dag:
        some_task_name = PythonOperator(
            task_id='some_task_name_id',
            python_callable=some_function,
        )

        another_task_name = PythonOperator(
            task_id='another_task_name_id',
            python_callable=another_function,
        )

        some_task_name >> another_task_name

我正在使用以下命令进行测试:

气流测试 etl_script some_task_name 'args'

并得到以下错误:

Traceback (most recent call last):
  File "~/airflow/.venv/bin/airflow", line 32, in <module>
    args.func(args)
  File "~/airflow/.venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
    return f(*args, **kwargs)
  File "~/airflow/.venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 648, in test
    task = dag.get_task(task_id=args.task_id)
  File "~/airflow/.venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1070, in get_task
    raise AirflowException("Task {task_id} not found".format(task_id=task_id))
airflow.exceptions.AirflowException: Task some_task_name not found

但如果我使任务名称和 ID 相同,它就可以工作。为什么会这样?我觉得我错过了一些东西,但无法弄清楚。任何人都可以发光吗?

环境:气流版本:1.10.5,Ubuntu 18.04,Python 3.6

标签: pythonairflow

解决方案


发生错误不是因为 task_id 和 name 不同,而是因为我在test命令中使用了任务名称而不是 task_id。从服务器命令行测试 dag 任务时,命令布局为:

气流测试 dag_id task_id 日期

我想最好保持任务名称和 ID 相同。


推荐阅读