python - 如何在 Azure Databricks PySpark 中执行存储过程?
问题描述
我可以在 Azure Databricks 中使用 PySpark 执行简单的 SQL 语句,但我想改为执行存储过程。下面是我尝试过的 PySpark 代码。
#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd
#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)
table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
.option("dbtable", table) \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.load()
#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)
这不起作用——我该怎么做?
解决方案
如果有人仍在寻找有关如何执行此操作的方法,则可以使用 spark session 的内置 jdbc-connector。以下代码示例可以解决问题:
import msal
# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...
# Write your SQL statement as a string
name = "Some passed value"
statement = f"""
EXEC Staging.SPR_InsertDummy
@Name = '{name}'
"""
# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/{tenant_id}"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]
# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)
# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)
# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()
# Close connections
exec_statement.close()
con.close()
有关使用 SQL 用户凭据通过 JDBC 进行连接的更多信息和类似方法,或者有关如何获取返回参数的信息,我建议您查看这篇博文:
推荐阅读
- flutter - 如何在多部分请求中的放置请求中设置参数化内容类型
- gradle - 使用 testcontainers 和 gradle 运行测试的 github 操作
- docker - VSCode DevContainers:如何在 MAC 和 Windows 上挂载主文件
- python - 在 Databricks 上找不到 config.yml
- django - Django 查询包括连接
- c++ - gcc 和 clang 中的 atomic_flag.wait/notify_one 呢?
- sql - BigQuery,在具有相同列名的 2 个表之间减去
- javascript - 如何在 gulp 中动态设置目标路径?
- c - 对返回值超过 127 的 char 执行操作时出现分段错误
- python - 如何在 pandas Dataframe 中对不同月份应用不同的条件?