首页 > 解决方案 > 气流 DAG 使用 PythonOperator 失败并出现“`python_callable` 参数必须是可调用的”异常

问题描述

损坏的 DAG:[/opt/airflow/dags/my_dag.py] Traceback(最近一次调用最后):文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py ",第 179 行,在 apply_defaults 结果 = func(self, *args, **kwargs) 文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py”中,行136、在init raise AirflowException(' python_callableparam must be callable') airflow.exceptions.AirflowException: python_callableparam must be callable

import airflow
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from datetime import datetime
from random import randint
from airflow.operators.bash import BashOperator

def _training_model():
    return randint(1,11)


def _choose_best_model(ti):
    accuracies = ti.xcom_pull(task_ids =[
        'training_model_A'
        'training_model_B'
        'training_model_C'
    ])
    best_accuracy = max(accuracies)
    
    if (best_accuracy >8):
        return 'accurate'
    return 'inaccurate'

with DAG(
    dag_id="mobile_app_usage", start_date=datetime(2021,1,1),
    schedule_interval="@daily",catchup=False) as dag:

    training_model_A = PythonOperator(
        task_id = "training_model_A",
        python_callable=_training_model()
    )

    training_model_B = PythonOperator(
        task_id = "training_model_B",
        python_callable=_training_model()
    )

    training_model_C = PythonOperator(
        task_id = "training_model_B",
        python_callable=_training_model()
    )

    choose_best_model = BranchPythonOperator(
        task_id = "choose_best_model",
        python_callable= _choose_best_model()
    )

    accurate = BashOperator(
        task_id ="accurate",
        bash_command="echo Accurate"
    
    )

    inaccurate = BashOperator(
        task_id ="inaccurate",
        bash_command="echo Inacurate"
    
    )

我的气流网络服务器的图像

标签: pythonairflow

解决方案


python_callable参数只需要执行的可调用对象的名称,而不是实际调用它。像这样:

...
training_model_A = PythonOperator(
        task_id = "training_model_A",
        python_callable=_training_model
    )

    training_model_B = PythonOperator(
        task_id = "training_model_B",
        python_callable=_training_model
    )

    training_model_C = PythonOperator(
        task_id = "training_model_C",
        python_callable=_training_model
    )

    choose_best_model = BranchPythonOperator(
        task_id = "choose_best_model",
        python_callable= _choose_best_model
    )
...

仅供参考 - 在上面的片段中,我还更新了task_id分配为“training_model_C”的任务的参数,因为它与task_id“training_model_B”相同。在 Airflow 中,task_id值在 DAG 中必须是唯一的。


推荐阅读