首页 > 解决方案 > 从 pyspark 本地机器连接到雪花时出现 Classnotfound 错误

问题描述

我正在尝试从本地机器上的 Pyspark 连接到雪花。

我的代码如下所示。

    from pyspark import SparkConf, SparkContext
    from pyspark.sql import SQLContext
    from pyspark.sql.types import *
    from pyspark import SparkConf, SparkContext

    sc = SparkContext("local", "sf_test")
    spark = SQLContext(sc)
    spark_conf = SparkConf().setMaster('local').setAppName('sf_test')

    sfOptions = {
      "sfURL" : "someaccount.some.address",
      "sfAccount" : "someaccount",
      "sfUser" : "someuser",
      "sfPassword" : "somepassword",
      "sfDatabase" : "somedb",
      "sfSchema" : "someschema",
      "sfWarehouse" : "somedw",
      "sfRole" : "somerole",
    }

SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

运行这段特定的代码时出现错误。

df = spark.read.format(SNOWFLAKE_SOURCE_NAME).options(**sfOptions).option("query","""select * from 
 "PRED_ORDER_DEV"."SALES"."V_PosAnalysis" pos 
    ORDER BY pos."SAPAccountNumber", pos."SAPMaterialNumber" """).load()

Py4JJavaError:调用 o115.load 时出错。:java.lang.ClassNotFoundException:找不到数据源:net.snowflake.spark.snowflake。 请在 org.apache 的 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657) 的http://spark.apache.org/third-party-projects.html中找到包 。 spark.sql.DataFrameReader.load(DataFrameReader.scala:194) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

我已经加载了连接器和 jdbc jar 文件并将它们添加到 CLASSPATH

pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4
CLASSPATH = C:\Program Files\Java\jre1.8.0_241\bin;C:\snowflake_jar

我希望能够连接到雪花并使用 Pyspark 读取数据。任何帮助将非常感激!

标签: pysparksnowflake-cloud-data-platform

解决方案


下面的工作脚本。

您应该在项目的根目录中创建目录 jar 并添加两个 jar:

  • snowflake-jdbc-3.13.4.jar (jdbc 驱动)
  • spark-snowflake_2.12-2.9.0-spark_3.1.jar(火花连接器)。

接下来你需要了解你的 scala 编译器版本是什么。我正在使用 PyCharm,所以双击 shift 并搜索“scala”。你会看到类似scala-compiler-2.12.10.jar 的东西。scala-compiler 版本的第一个数字(在我们的例子中是 2.12)应该与 spark 连接器的第一个数字(spark-snowflake_2.12 -2.9.0 -spark_3.1.jar)相同

在下载连接器之前检查 Scala 编译器版本

from pyspark.sql import SparkSession

sfOptions = {
    "sfURL": "sfURL",
    "sfUser": "sfUser",
    "sfPassword": "sfPassword",
    "sfDatabase": "sfDatabase",
    "sfSchema": "sfSchema",
    "sfWarehouse": "sfWarehouse",
    "sfRole": "sfRole",
}

spark = SparkSession.builder \
    .master("local") \
    .appName("snowflake-test") \
    .config('spark.jars', 'jar/snowflake-jdbc-3.13.4.jar,jar/spark-snowflake_2.12-2.9.0-spark_3.1.jar') \
    .getOrCreate()


SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
    .options(**sfOptions) \
    .option("query", "select * from some_table") \
    .load()

df.show()

推荐阅读