sql-server - 从 Pyspark 2.4 连接 SQL Server 以写入数据时出现问题
问题描述
我正在使用 Pyspark 2.4,想将数据写入 SQL Server,但它不起作用。
我已将从此处下载的 jar 文件放在spark 路径中:
D:\spark-2.4.3-bin-hadoop2.7\spark-2.4.3-bin-hadoop2.7\jars\
但是,无济于事。以下是将数据写入 SQL Server 的 pyspark 代码。
sql_server_dtls = {'user': 'john', 'password': 'doe'}
ports_budget_joined_DF.write.jdbc(url="jdbc:sqlserver://endpoint:1433;databaseName=poc", table='dbo.test_tmp', mode='overwrite', properties=sql_server_dtls)
出现以下错误:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\readwriter.py", line 982, in jdbc
self.mode(mode)._jwrite.jdbc(url, table, jprop)
File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\java_gateway.py", line 1257, in __call__
File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\sql\utils.py", line 63, in deco
return f(*a, **kw)
File "C:\Users\aakash.basu\AppData\Local\Programs\Python\Python37-32\lib\site-packages\pyspark\python\lib\py4j-0.10.7-src.zip\py4j\protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o45.jdbc.
: java.sql.SQLException: No suitable driver
我错过了什么吗?另外,我想在将新数据写入之前先截断表。DF 编写器中的 mode='overwrite' 是否也为 SQL Server 目标系统处理相同的问题?
解决方案
您只需要com.mysql.cj.jdbc.Driver
,Spark 可以自动将其下载到它正在寻找的任何目录中。
使用这个功能:
def connect_to_sql(
spark, jdbc_hostname, jdbc_port, database, data_table, username, password
):
jdbc_url = "jdbc:mysql://{0}:{1}/{2}".format(jdbc_hostname, jdbc_port, database)
connection_details = {
"user": username,
"password": password,
"driver": "com.mysql.cj.jdbc.Driver",
}
df = spark.read.jdbc(url=jdbc_url, table=data_table, properties=connection_details)
return df
添加:
您可以使用以下功能(您可以根据自己的需要对其进行编辑)在声明您的sparkSession()
. 您可以在列表中传递包的工件 ID,也可以作为逗号分隔的字符串传递。您可以从中央存储库中获取它们
def create_spark_session(master_url, packages=None):
"""
Creates a spark session
:param master_url: IP address of the cluster you want to submit the job to or local with all cores
:param packages: Any external packages if needed, only when called. This variable could be a string of the package
specification or a list of package specifications.
:return: spark session object
"""
if packages:
packages = ",".join(packages) if isinstance(packages, list) else packages
spark = (
SparkSession.builder.master(master_url)
.config("spark.io.compression.codec", "snappy")
.config("spark.ui.enabled", "false")
.config("spark.jars.packages", packages)
.getOrCreate()
)
else:
spark = (
SparkSession.builder.master(master_url)
.config("spark.io.compression.codec", "snappy")
.config("spark.ui.enabled", "false")
.getOrCreate()
)
return spark
推荐阅读
- r - 如何根据 geom_bar 中的 bar 值对 bar 进行排序?
- node.js - 尝试运行 Typescript 模拟 API 服务器时创建 React-App(模块加载错误)
- python-3.x - 执行 TFMA 时出现 TFX 管道错误:AttributeError:“NoneType”对象没有属性“ToBatchTensors”
- graph - Neo4J 浏览器问题
- arrays - 为什么函数不能接受数组别名
- php - Laravel 在部署时路由到子文件夹
- node.js - Content-Type: application/pdf 更改为 Content-Type: application/octet-stream
- android - 当我尝试使用凭证 api 直接保存时,我收到此错误消息
- rest - Marketo REST API - 批准使用 REST API 创建的自定义活动类型 - 611 系统错误
- r - R knit to PDF 无法正常工作并出现错误