apache-spark - 如何在查询中调用 oracle 存储过程?
问题描述
在我的 spark 项目中,我使用的是 spark-sql-2.4.1v。
作为我的代码的一部分,我需要在我的 spark 作业中调用 oracle 存储过程。
我们正在将一个旧项目转换为具有大量基于 oracle 存储过程的逻辑的 spark。我们正在转换为 spark 的中间件逻辑......所以希望保持 procs 逻辑原样,因为有其他应用程序使用它们......因此需要在 spark 代码中调用现有的 procs。
如何调用oracle存储过程?
解决方案
python 中的 cx_Oracle 模块可用于从 python / pyspark 脚本调用 oracle 存储过程。文档在这里 - https://cx-oracle.readthedocs.io/en/latest/user_guide/plsql_execution.html
如果由于任何原因,cx_Oracle 在 hadoop 环境中不起作用(因为它需要安装 oracle 客户端二进制文件),我们可以使用下面的 Spark JDBC 方法。
在 PySpark JDBC 选项中,有一个名为 sessionInitStatement 的属性,可用于在 JDBC 进程开始读取数据之前执行自定义 SQL 语句或 PL/SQL 块。spark JDBC 读取中的此选项可用于调用存储过程,如下所示。
这里首先我们使用 sessionInitStatement 执行 PL/SQL 过程,然后使用 spark/jdbc 读取从存储过程中读取最终数据集。
from pyspark.sql import SparkSession, HiveContext
spark = (SparkSession.builder.enableHiveSupport().getOrCreate())
# Provide PL/SQL code here - call the stored proc within BEGIN and END block.
plsql_block = """
BEGIN
SCHEMA.STORED_PROC_NAME;
END;
"""
# Read the final table that is created / updated within the stored proc.
count_query = """
(
select count(*) from SCHMA.TABLE_NAME
) t1
"""
df = spark.read \
.format("jdbc") \
.option("url", "JDBC_URL") \
.option('driver', 'oracle.jdbc.driver.OracleDriver') \
.option("oracle.jdbc.timezoneAsRegion", "false") \
.option("sessionInitStatement", plsql_block) \
.option("dbtable", count_query) \
.option("user", "USER_ID") \
.option("password", "PASSWORD") \
.load()
print("Total Records")
df.show(10, False)
推荐阅读
- android - 导航组件生命周期
- python-3.x - websocket的握手源代码中的laddr和raddr是什么
- python - 如何将未知数量的数据插入 sqlite3 数据库
- c++ - 如何在 C++ 中修复“错误:从 'int' 到 'const char*' [-fpermissive]' 的无效转换?
- jquery - 无法使用 Angular Datatable 重新初始化 DataTable
- android - 为什么返回 Settings.Secure.getString(context.getContentResolver(), Settings.Secure.ANDROID_ID) 一个空字符串;
- reactjs - 从另一个屏幕反应本机更新平面列表
- c# - 部署后在 VDI 中出现错误
- python - 如何在正在运行的 python 脚本中启动其他脚本(新线程或进程?)
- azure - 发布管道中的 Azure Devops 日志记录命令