首页 > 解决方案 > 如何使用 DataBricks 中的 Spark 连接器对 SQL 进行批量插入?

问题描述

我在 DataBricks 中有一个数据框,我正在尝试将其批量插入 SQL Server。我在微软的网站上关注了这个教程,特别是使用这个代码:


# df is created as a Dataframe, with 1000 rows of sample data

server_name = "jdbc:sqlserver://x.database.windows.net"
database_name = "dbTest"
url = server_name + ";" + "databaseName=" + database_name + ";"

table_name = "dbo.Bulk"
username = "user123"
password = "Password123"

df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", table_name) \
    .option("user", username) \
    .option("password", password) \
    .save()

但是,此过程会出现以下错误

java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;

更详细的错误日志:

Py4JJavaError                             Traceback (most recent call last)
<command-2622503877398381> in <module>
      7 password = "********"
      8 
----> 9 df_countries.write \
     10             .format("com.microsoft.sqlserver.jdbc.spark") \
     11             .mode("overwrite") \

/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
   1132             self.format(format)
   1133         if path is None:
-> 1134             self._jwrite.save()
   1135         else:
   1136             self._jwrite.save(path)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1302 
   1303         answer = self.gateway_client.send_command(command)
-> 1304         return_value = get_return_value(
   1305             answer, self.gateway_client, self.target_id, self.name)
   1306 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    108     def deco(*a, **kw):
    109         try:
--> 110             return f(*a, **kw)
    111         except py4j.protocol.Py4JJavaError as e:
    112             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1515.save.
: java.lang.NoSuchMethodError: org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.schemaString(Lorg/apache/spark/sql/Dataset;Ljava/lang/String;Lscala/Option;)Ljava/lang/String;
    at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.mssqlCreateTable(BulkCopyUtils.scala:506)
    at com.microsoft.sqlserver.jdbc.spark.SingleInstanceConnector$.createTable(SingleInstanceConnector.scala:46)
    at com.microsoft.sqlserver.jdbc.spark.Connector.write(Connector.scala:90)
    at com.microsoft.sqlserver.jdbc.spark.DefaultSource.createRelation(DefaultSource.scala:64)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:73)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:71)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:94)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:196)
    at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:240)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:165)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:236)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:192)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:163)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:162)
    at org.apache.spark.sql.DataFrameWriter.$anonfun$runCommand$1(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:1079)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:468)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:438)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:311)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
    at py4j.Gateway.invoke(Gateway.java:295)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:251)
    at java.lang.Thread.run(Thread.java:748)

我究竟做错了什么?

标签: sql-serverapache-sparkpysparkdatabricksazure-databricks

解决方案


我有同样的问题。

当我们想要使用最新的集群运行时(在撰写本文时为 8.3)时,问题就开始了。回到运行时 7 使其工作正常。

根据 Databricks 到 8 的迁移指南:“Databricks Runtime 8.0 将默认格式更改为 delta 以更简单地创建 Delta 表。” - 与镶木地板相反。

https://docs.databricks.com/release-notes/runtime/8.0-migration.html

我认为由于错误仅在使用运行时 8 时发生,这可能是导致它的原因,但我尝试实施选项 1(将 2 个配置添加到集群)并且错误仍在发生。

我想现在我们将把它留给运行时 7,但有时我们需要升级。


推荐阅读