首页 > 解决方案 > 如何在 Airflow 中将参数传递给 PythonOperator

问题描述

我刚开始使用Airflow,谁能告诉我如何将参数传递给PythonOperator,如下所示:

t5_send_notification = PythonOperator(
    task_id='t5_send_notification',
    provide_context=True,
    python_callable=SendEmail,
    op_kwargs=None,
    #op_kwargs=(key1='value1', key2='value2'),
    dag=dag,
)

def SendEmail(**kwargs):
    msg = MIMEText("The pipeline for client1 is completed, please check.")
    msg['Subject'] = "xxxx"
    msg['From'] = "xxxx"
    ......
    s = smtplib.SMTP('localhost')
    s.send_message(msg)
    s.quit()

我希望能够将一些参数传递给t5_send_notification's callable SendEmail,理想情况下,我想将完整日志和/或日志的一部分(基本上来自 kwargs)附加到要发送的电子邮件中,猜测这t5_send_notification是收集这些信息的地方。

非常感谢。

标签: pythonairflow

解决方案


  1. 将 dict 对象传递给op_kwargs
  2. 使用键从 python 可调用的kwargs dict访问它们的值

    def SendEmail(**kwargs):
        print(kwargs['key1'])
        print(kwargs['key2'])
        msg = MIMEText("The pipeline for client1 is completed, please check.")
        msg['Subject'] = "xxxx"
        msg['From'] = "xxxx"
        ......
        s = smtplib.SMTP('localhost')
        s.send_message(msg)
        s.quit()
    
    
    t5_send_notification = PythonOperator(
        task_id='t5_send_notification',
        provide_context=True,
        python_callable=SendEmail,
        op_kwargs={'key1': 'value1', 'key2': 'value2'},
        dag=dag,
    )
    

推荐阅读