python - 气流 DAG 使用 PythonOperator 失败并出现“`python_callable` 参数必须是可调用的”异常
问题描述
损坏的 DAG:[/opt/airflow/dags/my_dag.py] Traceback(最近一次调用最后):文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py ",第 179 行,在 apply_defaults 结果 = func(self, *args, **kwargs) 文件“/home/airflow/.local/lib/python3.6/site-packages/airflow/operators/python.py”中,行136、在init
raise AirflowException(' python_callable
param must be callable') airflow.exceptions.AirflowException: python_callable
param must be callable
import airflow
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
from datetime import datetime
from random import randint
from airflow.operators.bash import BashOperator
def _training_model():
return randint(1,11)
def _choose_best_model(ti):
accuracies = ti.xcom_pull(task_ids =[
'training_model_A'
'training_model_B'
'training_model_C'
])
best_accuracy = max(accuracies)
if (best_accuracy >8):
return 'accurate'
return 'inaccurate'
with DAG(
dag_id="mobile_app_usage", start_date=datetime(2021,1,1),
schedule_interval="@daily",catchup=False) as dag:
training_model_A = PythonOperator(
task_id = "training_model_A",
python_callable=_training_model()
)
training_model_B = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model()
)
training_model_C = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model()
)
choose_best_model = BranchPythonOperator(
task_id = "choose_best_model",
python_callable= _choose_best_model()
)
accurate = BashOperator(
task_id ="accurate",
bash_command="echo Accurate"
)
inaccurate = BashOperator(
task_id ="inaccurate",
bash_command="echo Inacurate"
)
解决方案
该python_callable
参数只需要执行的可调用对象的名称,而不是实际调用它。像这样:
...
training_model_A = PythonOperator(
task_id = "training_model_A",
python_callable=_training_model
)
training_model_B = PythonOperator(
task_id = "training_model_B",
python_callable=_training_model
)
training_model_C = PythonOperator(
task_id = "training_model_C",
python_callable=_training_model
)
choose_best_model = BranchPythonOperator(
task_id = "choose_best_model",
python_callable= _choose_best_model
)
...
仅供参考 - 在上面的片段中,我还更新了task_id
分配为“training_model_C”的任务的参数,因为它与task_id
“training_model_B”相同。在 Airflow 中,task_id
值在 DAG 中必须是唯一的。
推荐阅读
- twitter-bootstrap - Boostrap 4.2.1 的 tinyMCE 自动调整大小插件问题
- python - 在Apache Spark(pyspark 2.4)的同一行中获取数据帧集合列表中的重复项
- virtualenv - 是否有特定于 venv 的等效于 virtualenvwrapper 的“后激活”
- regex - Perl - 从字符串中捕获特定数量的单词
- x86 - 在环之间跳转时的处理器行为
- include - 如何在 Forth 中包含 C 库
- python-3.x - 如何在 QMainWindow 中的事件后按索引更新 QtabWidget?
- asp.net - 如何将 .cshtml 转换为 .aspx
- dart - 如何将文本添加到已填充图像作为背景的卡片?
- oracle - 清除 Oracle 数据库分片状态