python - 如何扩展 PythonOperator
问题描述
我正在尝试自定义我的 PythonOperator 并将其放在 $AIRFLOW_HOME/plugins 下,如下所示:
class MyPythonOperator(PythonOperator):
def my_callable(param1, param2, param3):
# do something
@apply_defaults
def __init__(self, task_id, *args, **kwargs):
super(MyPythonOperator, self).__init__(
task_id=task_id,
python_callable = self.my_callable,
provide_context = True,
*args, **kwargs)
然后我定义了一个气流 dag 代码,它非常简单,只有两个任务:
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_workflow',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
task1 = MyPythonOperator(
task_id='task1',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param3': 'param3_value'},
dag=dag
)
task2 = MyPythonOperator(
task_id='task2',
params={'param1': 'param1_value',
'param2': 'param2_value',
'param2': 'param3_value'},
dag=dag
)
task1 >> task2
但是在我运行 dag python 代码后,得到错误消息:
$ python example_airflow_code.py
[2019-05-15 19:51:10,338] {__init__.py:51} INFO - Using executor SequentialExecutor
usage: example_airflow_code.py [-h]
{list_tasks,backfill,test,run,pause,unpause,list_dag_runs}
...
example_airflow_code.py: error: too few arguments
我尝试了一些调试,并在这一行插入了一个断点:
super(MyPythonOperator, self).__init__()
在调用超级构造函数之前发现self.dag和self.dag_id的值异常,值为:
str: Traceback (most recent call last):
File "/Applications/Eclipse.app/Contents/Eclipse/plugins/org.python.pydev.core_6.4.4.201807281807/pysrc/_pydevd_bundle/pydevd_resolver.py", line 166, in _getPyDictionary
attr = getattr(var, n)
File "/Users/zhuangxy/anaconda2/lib/python2.7/site-packages/airflow/models/__init__.py", line 2399, in dag_id
return 'adhoc_' + self.owner
AttributeError: 'MyPythonOperator' object has no attribute 'owner'
有人知道这个例子有什么问题吗?非常感谢!
解决方案
我最近也遇到了这个。看来您缺少context
自定义 PythonOperator 上的参数。
更改您的方法定义,使其看起来像这样:
def my_callable(param1, param2, param3, **context):
# do something
失败的原因是provide_context=True
您在运算符中提供的标志。出于某种原因,python 可调用对象正在您的参数中寻找它。
推荐阅读
- sql-server - 错误:超出最大存储过程、函数、触发器或视图嵌套级别(限制 32)
- python - 传递自定义类型以单击 python
- kotlin - 我正在尝试以任何方式分发我的 TornadoFX 桌面应用程序(导出到 .jar 可执行文件或安装),但我似乎无法弄清楚如何
- python - 调试器不起作用:FileNotFoundError:[Errno 2] 没有这样的文件或目录:
- java - Apache-Pulsar Java 客户端可以在 RaspberryPi4 (ARMv8) 上运行吗?
- amazon-web-services - 为什么我收到以下错误?“放置 S3 策略时出错:MalformedPolicy:策略的操作无效”
- contact-form-7 - Withh contact form 7 获取自定义字段值
- shell - 像这张图片一样添加水印
- r - 如何在 RStudio 的折叠中折叠代码
- nativescript - 添加新项目非常慢