python - 如何在 Airflow 中将参数传递给 PythonOperator
问题描述
我刚开始使用Airflow,谁能告诉我如何将参数传递给PythonOperator,如下所示:
t5_send_notification = PythonOperator(
task_id='t5_send_notification',
provide_context=True,
python_callable=SendEmail,
op_kwargs=None,
#op_kwargs=(key1='value1', key2='value2'),
dag=dag,
)
def SendEmail(**kwargs):
msg = MIMEText("The pipeline for client1 is completed, please check.")
msg['Subject'] = "xxxx"
msg['From'] = "xxxx"
......
s = smtplib.SMTP('localhost')
s.send_message(msg)
s.quit()
我希望能够将一些参数传递给t5_send_notification
's callable SendEmail
,理想情况下,我想将完整日志和/或日志的一部分(基本上来自 kwargs)附加到要发送的电子邮件中,猜测这t5_send_notification
是收集这些信息的地方。
非常感谢。
解决方案
- 将 dict 对象传递给op_kwargs
使用键从 python 可调用的kwargs dict访问它们的值
def SendEmail(**kwargs): print(kwargs['key1']) print(kwargs['key2']) msg = MIMEText("The pipeline for client1 is completed, please check.") msg['Subject'] = "xxxx" msg['From'] = "xxxx" ...... s = smtplib.SMTP('localhost') s.send_message(msg) s.quit() t5_send_notification = PythonOperator( task_id='t5_send_notification', provide_context=True, python_callable=SendEmail, op_kwargs={'key1': 'value1', 'key2': 'value2'}, dag=dag, )
推荐阅读
- java - 如何进行测试,测试字符串是否有大写、小写、特殊字符?
- java - 在 Spring 应用程序中添加 Apache Camel 自定义组件/端点
- android-studio - java.lang.NullPointerException:null 不能转换为非 null 类型 com.google.android.gms.maps.SupportMapFragment
- javascript - 第二次 window.onload 使用
- windows - 如何在 Windows 10 Home 上查询 RDP 影子的会话 ID?
- jquery - 定位 MC Datepicker 模态
- python-3.x - 将长序列写入 csv 或 parquet:IO 对长时间运行的实验时间的影响?
- rust - 如何在库中使用 pub(crate) 方法具有公共特征?
- algorithm - 图中没有重复的最短路径
- scala - 如何将此 SetLike 集合从 Scala 2.12 转换为 2.13?