首页 > 解决方案 > 如何使用来自 Airflow OracleOperator 的多个语句执行 sql 脚本

问题描述

我正在尝试调用一个带有多个语句的 sql 文件,这些语句由 ; 分隔。通过气流中的 OracleOperator ,但它通过多个语句给出以下错误

例如文件包含

调用 DROP_OBJECTS('TABLE_XYZ');

CREATE TABLE TABLE_XYZ AS SELECT 1 Dummy from DUAL;

[2019-06-18 18:19:12,582] { init .py:1580} 错误 - ORA-00933:SQL 命令未正确结束 Traceback(最后一次调用):文件“/usr/local/lib/python3.6 /site-packages/airflow/models/ init .py”,第 1441 行,在 _run_raw_task 结果 = task_copy.execute(context=context) 文件“/usr/local/lib/python3.6/site-packages/airflow/operators/ oracle_operator.py”,第 63 行,在执行参数=self.parameters) 文件“/usr/local/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py”,第 172 行,运行 cur.execute (s) cx_Oracle.DatabaseError: ORA-00933: SQL 命令未正确结束

即使单个语句以 ; 结尾 给出以下错误:

例如文件

CREATE TABLE TABLE_XYZ AS SELECT 1 Dummy from DUAL;

[2019-06-18 17:47:53,137] { init .py:1580} 错误 - ORA-00922:缺少或无效选项 Traceback(最近一次调用最后):文件“/usr/local/lib/python3.6/ site-packages/airflow/models/ init .py”,第 1441 行,在 _run_raw_task 结果 = task_copy.execute(context=context) 文件“/usr/local/lib/python3.6/site-packages/airflow/operators/oracle_operator .py”,第 63 行,在执行参数=self.parameters) 文件“/usr/local/lib/python3.6/site-packages/airflow/hooks/dbapi_hook.py”,第 172 行,运行 cur.execute( s)

with DAG('my_simple_dag',
     default_args=default_args,
     template_searchpath=['/root/rahul/'],
     schedule_interval='*/10 * * * *',
     ) as dag:

opr_oracle = OracleOperator(task_id='oracleTest',oracle_conn_id='STG',
                    sql='test.sql')

我是否需要传递任何其他参数以使 dbhook 了解文件需要在单独的语句中拆分?

根据文档,它期望参数 sql:要执行的 sql 代码。可以接收表示 sql 语句的 str、str 列表(sql 语句)或对模板文件的引用。模板引用由以“.sql”结尾的 str 识别(模板化)

但 .sql 模板不适用于多个语句。任何帮助将不胜感激 。谢谢 !!

标签: python-3.xairflowmysql-python

解决方案


Oracle Operator 将采用模板化的 SQL 字符串列表。

我所做的是将 SQL 文件作为文本文件读入,然后将其拆分为 ';' 创建字符串列表。

with open('/home/airflow/airflow/dags/sql/test_multi.sql') as sql_file:
    sql_list = list(filter(None, sql_file.read().split(';')))

t_run_sql = OracleOperator(task_id='run_sql', 
                                    sql=sql_list, 
                                    oracle_conn_id='user_id',
                                    autocommit=True,
                                    depends_on_past=True,
                                    dag=dag)

我用模板对此进行了测试(是的,如果不先创建表,这将在 Oracle 中失败):

drop table test_multi;

create table test_multi as
select
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=1),'%Y%m%d') }} as day1, 
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=2),'%Y%m%d') }} as day2,
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=3),'%Y%m%d') }} as day3
from dual;

insert into test_multi
select
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=4),'%Y%m%d') }} as day1, 
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=5),'%Y%m%d') }} as day2,
  {{ macros.datetime.strftime(execution_date.in_tz('Australia/Sydney') + macros.timedelta(days=6),'%Y%m%d') }} as day3
from dual;

此解决方案需要确保您的 SQL 在其他任何地方都不包含分号。我还认为在 ';/n' 上拆分可能会更好,但它要求用户总是在 ';' 之后开始换行,所以仍然不理想。

我还发现我需要用过滤器(无,...)处理最后一个分号,否则操作员会向数据库提交一个空命令,然后出错。


推荐阅读