sql-server - 使用 Pyspark 将 SQL 查询从 DataBricks 发送到 SQL Server
问题描述
将自定义 SQL 查询发送到 Python 上的 SQL 数据库非常简单。
connection = mysql.connector.connect(host='localhost',
database='Electronics',
user='pynative',
password='pynative@#29')
sql_select_Query = "select * from Laptop" #any custom sql statement not particularly select statement
cursor = connection.cursor()
cursor.execute(sql_select_Query)
records = cursor.fetchall()
但是,我在互联网上搜索了 Databricks 上的类似任务,但没有找到任何解决方案。值得一提的是,我可以使用 JDBC 读取和写入 SQL Server 数据库,但我想发送一个自定义 SQL 语句,例如我想在 SQL Server 数据库中执行的“批量插入”语句。
这是我使用 JDBC 从 SQL Server 读取数据的方法。
table_name="dbo.myTable"
spark.read.jdbc(url=jdbcUrl, table=table_name, properties=connectionProperties)
解决方案
请参考此文档:使用 JDBC 的 SQL 数据库:
Databricks Runtime 包含适用于 Microsoft SQL Server 和 Azure SQL 数据库的 JDBC 驱动程序。有关 Databricks 运行时中包含的 JDBC 库的完整列表,请参阅 Databricks 运行时发行说明。
- 本文介绍如何使用 DataFrame API 通过 JDBC 连接到 SQL 数据库,以及如何通过 JDBC 接口控制读取的并行性。本文提供了使用 Scala API 的详细示例,最后附有缩写的 Python 和 Spark SQL 示例。有关使用 JDBC 连接到 SQL 数据库的所有受支持参数,请参阅JDBC To Other Databases。
Python 示例:
jdbcHostname = "<hostname>"
jdbcDatabase = "employees"
jdbcPort = 1433
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2};user={3};password={4}".format(jdbcHostname, jdbcPort, jdbcDatabase, username, password)
pushdown_query = "(select * from employees where emp_no < 10008) emp_alias"
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)
但是传统的 jdbc 连接器使用逐行插入将数据写入数据库。可以使用 Spark 连接器通过批量插入将数据写入 Azure SQL 和 SQL Server。在加载大型数据集或将数据加载到使用列存储索引的表中时,它显着提高了写入性能。
import com.microsoft.azure.sqldb.spark.bulkcopy.BulkCopyMetadata
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.connect._
/**
Add column Metadata.
If not specified, metadata is automatically added
from the destination table, which may suffer performance.
*/
var bulkCopyMetadata = new BulkCopyMetadata
bulkCopyMetadata.addColumnMetadata(1, "Title", java.sql.Types.NVARCHAR, 128, 0)
bulkCopyMetadata.addColumnMetadata(2, "FirstName", java.sql.Types.NVARCHAR, 50, 0)
bulkCopyMetadata.addColumnMetadata(3, "LastName", java.sql.Types.NVARCHAR, 50, 0)
val bulkCopyConfig = Config(Map(
"url" -> "mysqlserver.database.windows.net",
"databaseName" -> "MyDatabase",
"user" -> "username",
"password" -> "*********",
"dbTable" -> "dbo.Clients",
"bulkCopyBatchSize" -> "2500",
"bulkCopyTableLock" -> "true",
"bulkCopyTimeout" -> "600"
))
df.bulkCopyToSqlDB(bulkCopyConfig, bulkCopyMetadata)
//df.bulkCopyToSqlDB(bulkCopyConfig) if no metadata is specified.
参考:使用 Spark 连接器
HTH。
推荐阅读
- python - 将数据框作为 Mongo 的值的字典;.to_dict 不是一个选项,有什么方法可以转换吗?
- ios - 无法上传图片(React Native,Laravel)
- node.js - HTTPS AWS Elastic Beanstalk
- c++ - 如果“所有者”,C ++类析构函数删除成员?
- windows - Powershell ICACLS 更改文件的权限
- csv - WSO2:将 CSV 消息转换为 json wso2 esb 。序言中出现意外字符'"'(代码34)失败;预期'<'
- sapui5 - Fiori 中语义对象的用途
- csrf - 如何保护 Auth0 认证的 REST 服务免受 XSRF 和会话劫持?
- javascript - 我正在接受挑战
- python - FastICA 的问题,修改一个独立的源会改变所有的输出