首页 > 解决方案 > 将宏值传递给气流中的sql文件

问题描述

我有一个 sql 文件,有一个 sql 查询:-

delete from xyz where id in = 3 and time = '{{ execution_date.subtract(hours=2).strftime("%Y-%m-%d %H:%M:%S") }}';

在这里,我在 sql 查询本身中编写宏,我想从操作员调用此 sql 查询的 python 文件中传递它的值。

time = f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''

我想将此全局时间变量传递给 sql 文件,而不是再次在那里编写完整的宏。

PostgresOperator(dag=dag,
                 task_id='delete_entries', 
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

如果我time在查询中使用 jinja 模板 as {{ time }},而不是评估它,它只作为一个完整的字符串传递。请帮助,坚持了很长时间。

标签: pythonsqlvariablesjinja2airflow

解决方案


由于您想f'\'{{{{ execution_date.subtract(hours= {value1}).strftime("%Y-%m-%d %H:%M:%S") }}}}\''在不复制代码的情况下使用两个运算符,您可以将其定义为用户宏。

from datetime import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator


def ds_macro_format(execution_date, hours):
    return execution_date.subtract(hours=hours).strftime("%Y-%m-%d %H:%M:%S")


user_macros = {
    'format': ds_macro_format
}

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2021, 6, 7),
}

dag = DAG(
    "stackoverflow_question1",
    default_args=default_args,
    schedule_interval="@daily",
    user_defined_macros=user_macros
)

PostgresOperator(dag=dag,
                 task_id='delete_entries',
                 postgres_conn_id='database_connection',
                 sql='sql/delete_entry.sql')

delete_entry.sql作为:

delete from xyz where id in = 3 and time = {{ format(execution_date, hours=2) }};

在此处输入图像描述

假设您还想在 BashOperator 中使用宏,您可以这样做:

BashOperator(
    task_id='bash_task',
    bash_command='echo {{ format(execution_date, hours=2) }} ',
    dag=dag,
)

在此处输入图像描述


推荐阅读