sql-server - 无法使用 PySpark 插入 SQL,但可以在 SQL 中使用
问题描述
我使用以下方法在 SQL 中创建了一个表:
CREATE TABLE [dbo].[Validation](
[RuleId] [int] IDENTITY(1,1) NOT NULL,
[AppId] [varchar](255) NOT NULL,
[Date] [date] NOT NULL,
[RuleName] [varchar](255) NOT NULL,
[Value] [nvarchar](4000) NOT NULL
)
注意身份密钥(RuleId)
在 SQL 中将值插入到表中时,它可以工作:
注意:如果表为空并递增,则不按原样插入主键将自动填充
INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')
但是,当在 databricks 上创建临时表并执行下面的相同查询时,在 PySpark 上运行此查询,如下所示:
%python
driver = <Driver>
url = "jdbc:sqlserver:<URL>"
database = "<db>"
table = "dbo.Validation"
user = "<user>"
password = "<pass>"
#import the data
remote_table = spark.read.format("jdbc")\
.option("driver", driver)\
.option("url", url)\
.option("database", database)\
.option("dbtable", table)\
.option("user", user)\
.option("password", password)\
.load()
remote_table.createOrReplaceTempView("YOUR_TEMP_VIEW_NAMES")
sqlcontext.sql("INSERT INTO YOUR_TEMP_VIEW_NAMES VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")
我收到以下错误:
AnalysisException:'未知要求要插入的数据与目标表具有相同的列数:目标表有 5 列,但插入的数据有 4 列,包括 0 个具有常数的分区列值。;'
为什么它可以在 SQL 上工作,但在通过数据块传递查询时却不行?如何在不出现此错误的情况下通过 pyspark 插入?
解决方案
这里最直接的解决方案是使用 Scala 单元中的 JDBC。例如
%scala
import java.util.Properties
import java.sql.DriverManager
val jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
val jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
// Create the JDBC URL without passing in the user and password parameters.
val jdbcUrl = s"jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
// Create a Properties() object to hold the parameters.
val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
connectionProperties.setProperty("Driver", driverClass)
val connection = DriverManager.getConnection(jdbcUrl, jdbcUsername, jdbcPassword)
val stmt = connection.createStatement()
val sql = "INSERT INTO dbo.Validation VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')"
stmt.execute(sql)
connection.close()
您也可以使用 pyodbc,但默认情况下未安装 SQL Server ODBC 驱动程序,而 JDBC 驱动程序已安装。
Spark 解决方案是在 SQL Server 中创建一个视图并插入。例如
create view Validation2 as
select AppId,Date,RuleName,Value
from Validation
然后
tableName = "Validation2"
df = spark.read.jdbc(url=jdbcUrl, table=tableName, properties=connectionProperties)
df.createOrReplaceTempView(tableName)
sqlContext.sql("INSERT INTO Validation2 VALUES ('TestApp','2020-05-15','MemoryUsageAnomaly','2300MB')")
如果你想封装 Scala 并从另一种语言(如 Python)调用它,你可以使用 scala包 cell。
例如
%scala
package example
import java.util.Properties
import java.sql.DriverManager
object JDBCFacade
{
def runStatement(url : String, sql : String, userName : String, password: String): Unit =
{
val connection = DriverManager.getConnection(url, userName, password)
val stmt = connection.createStatement()
try
{
stmt.execute(sql)
}
finally
{
connection.close()
}
}
}
然后你可以这样称呼它:
jdbcUsername = dbutils.secrets.get(scope = "kv", key = "sqluser")
jdbcPassword = dbutils.secrets.get(scope = "kv", key = "sqlpassword")
jdbcUrl = "jdbc:sqlserver://xxxx.database.windows.net:1433;database=AdventureWorks;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
sql = "select 1 a into #foo from sys.objects"
sc._jvm.example.JDBCFacade.runStatement(jdbcUrl,sql, jdbcUsername, jdbcPassword)
推荐阅读
- sql - SSMS - 在编辑 200 行模式下搜索的快捷方式是什么?
- python - 按字母顺序对 dtype 进行排序
- oracle - 我收到与编码相关的错误
- .htaccess - htaccess 重写 url 以覆盖连字符后的字符
- dictionary - Flutter - 可滚动布局(带地图的表单)
- javascript - 如何为移动设备禁用我的 javascript
- sql - 仅输出行的超集
- angular - 在 Angular 6 中实现区域类型图表 Highcharts
- python - 带有标准输入和标准输出的子进程 Popen --- 总是挂断
- python - How to change background colour of Figure object in Tkinter