airflow - PostgresHook 中的 Airflow conf 参数抛出错误
问题描述
该脚本从气流配置 {"flag":"NA","metric_name":"RED"} 获取输入参数。我正在尝试在 sql 的 where 子句中使用一个参数值。有人可以检查我传递 where 原因的方式是否有任何问题吗?
错误:-“{”处或附近的语法错误
def get_metrics(**kwargs):
varteam_flag=(kwargs['dag_run'].conf['flag'])
print("flag :",varteam_flag)
params={"param1": varteam_flag}
print(" params :",params['param1'])
conn_id = kwargs.get('conn_id')
pg_hook = PostgresHook(conn_id)
sql="select count(1) as cnt FROM odw.metrics where flag = {{ params.param1 }} "
print(sql)
records = pg_hook.get_records(sql)
print("Records count is :",str(records))
return records
getData_metrics=PythonOperator(task_id='getData_metrics', python_callable=get_metrics,op_kwargs {'conn_id':'veas'},provide_context=True, dag=dag)
解决方案
您不能以您的方式在可调用的 python 中使用 Jinja。调用运算符时正在评估 Jinja,因此在您的情况下,jinja 引擎不适用于 sql 语句。
您需要做的只是使用一个简单的分配:
sql=f"select count(1) as cnt FROM odw.metrics where flag = {params.param1} "
我不太确定你为什么要做这么多我相信的任务
sql=f"select count(1) as cnt FROM odw.metrics where flag = {varteam_flag} "
应该提供相同的sql语句。
推荐阅读
- json - 从 Get 到 Interface 的 Angular Map 对象
- php - 文件夹控制器创建为空
- javascript - 角度的FabricJS自定义过滤器
- flask - rabbitmq Pika 连接频繁断开
- r - 如果日期匹配,则将来自不同数据帧的列相乘
- c++ - 指定基类方法模板类型 (c++)
- python - 将 NxN 矩阵中的值分组为 N/2 x N/2 矩阵
- c++ - operator-> 有什么特别之处,它是如何工作的?
- python - 通过 xpath 查找元素,其中 xpath 具有变量
- javascript - 我在 firebase.auth.ApplicationVerifier 有问题