首页 > 解决方案 > PostgresHook 中的 Airflow conf 参数抛出错误

问题描述

该脚本从气流配置 {"flag":"NA","metric_name":"RED"} 获取输入参数。我正在尝试在 sql 的 where 子句中使用一个参数值。有人可以检查我传递 where 原因的方式是否有任何问题吗?

错误:-“{”处或附近的语法错误

def get_metrics(**kwargs):
    varteam_flag=(kwargs['dag_run'].conf['flag'])
    print("flag :",varteam_flag)
    params={"param1": varteam_flag}
    print(" params :",params['param1'])
    conn_id = kwargs.get('conn_id')
    pg_hook = PostgresHook(conn_id)
    sql="select count(1) as cnt FROM odw.metrics where  flag = {{ params.param1 }} "
    print(sql)
    records = pg_hook.get_records(sql)
    print("Records count is  :",str(records))
    return records

getData_metrics=PythonOperator(task_id='getData_metrics', python_callable=get_metrics,op_kwargs {'conn_id':'veas'},provide_context=True, dag=dag)

标签: airflow

解决方案


您不能以您的方式在可调用的 python 中使用 Jinja。调用运算符时正在评估 Jinja,因此在您的情况下,jinja 引擎不适用于 sql 语句。

您需要做的只是使用一个简单的分配:

sql=f"select count(1) as cnt FROM odw.metrics where  flag = {params.param1} "

我不太确定你为什么要做这么多我相信的任务

sql=f"select count(1) as cnt FROM odw.metrics where flag = {varteam_flag} "

应该提供相同的sql语句。


推荐阅读