python-3.x - Airflow 2.0 - AttributeError:“MyOperator”对象没有属性“kwargs”
问题描述
我正在尝试为 Airflow 2.0 编写自定义运算符,但我似乎无法理解为什么运算符无法识别该kwargs
参数。
这是我的自定义运算符文件
from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
name,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.name = name
def execute(self, context):
return self.kwargs
这是我的一天:
from datetime import timedelta
from airflow import DAG
from airflow.utils.dates import days_ago
from operators.custom import MyOperator
args = {
'owner': 'airflow',
}
with DAG(
dag_id='ex_operator',
default_args=args,
schedule_interval='0 0 * * *',
start_date=days_ago(1),
dagrun_timeout=timedelta(minutes=60)
) as dag:
custom_ops = MyOperator(
task_id = 'myop_id',
name = 'me',
params = {
'lib': 'rainy'
}
)
当我运行它时,我得到了错误:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1086, in _run_raw_task
self._prepare_and_execute_task_with_callbacks(context, task)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1260, in _prepare_and_execute_task_with_callbacks
result = self._execute_task(context, task_copy)
File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 1300, in _execute_task
result = task_copy.execute(context=context)
File "/home/airflow/plugins/operators/custom.py", line 33, in execute
return self.kwargs
AttributeError: 'MyOperator' object has no attribute 'kwargs'
有什么我做错了,为什么这不能识别在任何普通 Python 类中都可以识别的 kwargs?
解决方案
args
并且kwargs
没有被分配,超级班级也没有这样做。这解决了它。
class MyOperator(BaseOperator):
@apply_defaults
def __init__(self,
name,
*args,
**kwargs):
super(MyOperator, self).__init__(*args, **kwargs)
self.name = name
self.args = args
self.kwargs = kwargs
更新:在BaseOperator
超类中,我看到了这个:
super().__init__()
if kwargs:
if not conf.getboolean('operators', 'ALLOW_ILLEGAL_ARGUMENTS'):
raise AirflowException(
"Invalid arguments were passed to {c} (task_id: {t}). Invalid "
"arguments were:\n**kwargs: {k}".format(c=self.__class__.__name__, k=kwargs, t=task_id),
)
warnings.warn(
'Invalid arguments were passed to {c} (task_id: {t}). '
'Support for passing such arguments will be dropped in '
'future. Invalid arguments were:'
'\n**kwargs: {k}'.format(c=self.__class__.__name__, k=kwargs, t=task_id),
category=PendingDeprecationWarning,
stacklevel=3,
)
目前,您必须allow_illegal_arguments
在airflow.cfg
文件中true
进行设置才能将其他/未使用的参数传递给 BaseOperator。但是,Airflow 似乎有计划在未来弃用它。
推荐阅读
- postgresql - 为什么 autovacuum 没有运行
- php - PHP - 如何获得多维数组的总和?
- c - 如何从线程中检索和比较结果值?
- c# - 用于带有 Microsoft IoT 的 UWP 应用程序的 C# Logger
- python - 使用神经网络(使用 python)进行医学图像分类的简单方法,无需在我的机器上进行训练
- android - Google Maps Android Sdk 集成,无需计费帐户(无需信用卡)
- java - 存储 DStream、检查点、持久化?
- rate-limiting - Lyft 是否会限制对整个应用程序 API 的请求?
- r - 使用字符串值选择数据框并更改其列名
- javascript - 如何 Datalayer.push 复选框变量