首页 > 解决方案 > Airflow 2.0 - AttributeError:“MyOperator”对象没有属性“kwargs”

问题描述

我正在尝试为 Airflow 2.0 编写自定义运算符,但我似乎无法理解为什么运算符无法识别该kwargs参数。

这是我的自定义运算符文件

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class MyOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                 name,
                 *args,
                 **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):

        return self.kwargs

这是我的一天:

from datetime import timedelta

from airflow import DAG
from airflow.utils.dates import days_ago
from operators.custom import MyOperator

args = {
    'owner': 'airflow',
}

with DAG(
    dag_id='ex_operator',
    default_args=args,
    schedule_interval='0 0 * * *',
    start_date=days_ago(1),
    dagrun_timeout=timedelta(minutes=60)
) as dag:

    custom_ops = MyOperator(
        task_id = 'myop_id',
        name = 'me',
        params = {
            'lib': 'rainy'
        }
    )

当我运行它时,我得到了错误:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1086, in _run_raw_task
    self._prepare_and_execute_task_with_callbacks(context, task)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1260, in _prepare_and_execute_task_with_callbacks
    result = self._execute_task(context, task_copy)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1300, in _execute_task
    result = task_copy.execute(context=context)
  File "/home/airflow/plugins/operators/custom.py", line 33, in execute
    return self.kwargs

AttributeError: 'MyOperator' object has no attribute 'kwargs'

有什么我做错了,为什么这不能识别在任何普通 Python 类中都可以识别的 kwargs?

标签: python-3.xairflow

解决方案


args并且kwargs没有被分配,超级班级也没有这样做。这解决了它。

class MyOperator(BaseOperator):

    @apply_defaults
    def __init__(self,
                 name,
                 *args,
                 **kwargs):
        super(MyOperator, self).__init__(*args, **kwargs)
        self.name = name
        self.args = args
        self.kwargs = kwargs

更新:在BaseOperator超类中,我看到了这个:

super().__init__()
        if kwargs:
            if not conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS'):
                raise AirflowException(
                    "Invalid arguments were passed to {c} (task_id: {t}). Invalid "
                    "arguments were:\n**kwargs: {k}".format(c=self.__class__.__name__, k=kwargs, t=task_id),
                )
            warnings.warn(
                'Invalid arguments were passed to {c} (task_id: {t}). '
                'Support for passing such arguments will be dropped in '
                'future. Invalid arguments were:'
                '\n**kwargs: {k}'.format(c=self.__class__.__name__, k=kwargs, t=task_id),
                category=PendingDeprecationWarning,
                stacklevel=3,
            )

目前,您必须allow_illegal_argumentsairflow.cfg文件中true进行设置才能将其他/未使用的参数传递给 BaseOperator。但是,Airflow 似乎有计划在未来弃用它。


推荐阅读