首页 > 解决方案 > 如何在 Azure Databricks PySpark 中执行存储过程?

问题描述

我可以在 Azure Databricks 中使用 PySpark 执行简单的 SQL 语句,但我想改为执行存储过程。下面是我尝试过的 PySpark 代码。

#initialize pyspark
import findspark
findspark.init('C:\Spark\spark-2.4.5-bin-hadoop2.7')
#import required modules
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import *
import pandas as pd

#Create spark configuration object
conf = SparkConf()
conf.setMaster("local").setAppName("My app")
#Create spark context and sparksession
sc = SparkContext.getOrCreate(conf=conf)
spark = SparkSession(sc)

table = "dbo.test"
#read table data into a spark dataframe
jdbcDF = spark.read.format("jdbc") \
    .option("url", f"jdbc:sqlserver://localhost:1433;databaseName=Demo;integratedSecurity=true;") \
    .option("dbtable", table) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .load()

#show the data loaded into dataframe
#jdbcDF.show()
sqlQueries="execute testJoin"
resultDF=spark.sql(sqlQueries)
resultDF.show(resultDF.count(),False)

这不起作用——我该怎么做?

标签: pythonpyspark-sqlazure-databrickspyspark-dataframes

解决方案


如果有人仍在寻找有关如何执行此操作的方法,则可以使用 spark session 的内置 jdbc-connector。以下代码示例可以解决问题:

import msal

# Set url & credentials
jdbc_url = ...
tenant_id = ...
sp_client_id = ...
sp_client_secret = ...

# Write your SQL statement as a string
name = "Some passed value"

statement = f"""
EXEC Staging.SPR_InsertDummy
  @Name = '{name}'
"""

# Generate an OAuth2 access token for service principal
authority = f"https://login.windows.net/{tenant_id}"
app = msal.ConfidentialClientApplication(sp_client_id, sp_client_secret, authority)
token = app.acquire_token_for_client(scopes="https://database.windows.net/.default")["access_token"]

# Create a spark properties object and pass the access token
properties = spark._sc._gateway.jvm.java.util.Properties()
properties.setProperty("accessToken", token)

# Fetch the driver manager from your spark context
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager

# Create a connection object and pass the properties object
con = driver_manager.getConnection(jdbc_url, properties)

# Create callable statement and execute it
exec_statement = con.prepareCall(statement)
exec_statement.execute()

# Close connections
exec_statement.close()
con.close()

有关使用 SQL 用户凭据通过 JDBC 进行连接的更多信息和类似方法,或者有关如何获取返回参数的信息,我建议您查看这篇博文:

https://medium.com/delaware-pro/executing-ddl-statements-stored-procedures-on-sql-server-using-pyspark-in-databricks-2b31d9276811


推荐阅读