首页 > 解决方案 > Python运算符中的气流宏

问题描述

我正在尝试在我的 Python 运算符中使用气流宏,但我不断收到“气流:错误:无法识别的参数:”

所以我正在导入一个具有 3 个位置参数的函数:(sys.argv,start_date,end_date),我希望将start_dateend_date设为Airflow 中的执行日期。

函数参数看起来像这样

def main(argv,start_date,end_date):

这是我在 DAG 中的任务:

t1 = PythonOperator(
    task_id='Pull_DCM_Report',
    provide_context=True,
    python_callable=main,
    op_args=[sys.argv,'{{ ds }}','{{ ds }}'],
    dag=dag)

标签: airflow

解决方案


由于您要传入需要由 Airflow 呈现的日期,因此您需要templates_dict在 Python 运算符中使用该参数。此字段是 Airflow 将识别为包含模板的唯一字段。

您可以创建自定义 Python 运算符,通过复制现有运算符并将相关字段添加到template_fields元组中,将更多字段识别为模板。

def main(**kwargs):
    argv = kwargs.get('templates_dict').get('argv')
    start_date = kwargs.get('templates_dict').get('start_date')
    end_date = kwargs.get('templates_dict').get('end_date')


t1 = PythonOperator(task_id='Pull_DCM_Report',
                    provide_context=True,
                    python_callable=main,
                    templates_dict={'argv': sys.argv,
                                    'start_date': '{{ yesterday_ds }}',
                                    'end_date': '{{ ds }}'},
                    dag=dag)

推荐阅读