apache-spark - 无法从 pyspark 的 cassandra 数据库加载信息
问题描述
我有这个代码:
import os
from pyspark import SparkContext,SparkFiles,SQLContext,SparkFiles
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col
secure_bundle_file=os.getcwd()+'\\secure-connect-dbtest.zip'
sparkSession =SparkSession.builder.appName('SparkCassandraApp')\
.config('spark.cassandra.connection.config.cloud.path',secure_bundle_file)\
.config('spark.cassandra.auth.username', 'test')\
.config('spark.cassandra.auth.password','testquart')\
.config('spark.dse.continuousPagingEnabled',False)\
.master('local[*]').getOrCreate()
data = sparkSession.read.format("org.apache.spark.sql.cassandra")\
.options(table="tbthesis", keyspace="test").load()
data.count()
我尝试做的是连接到我的数据库并检索我的数据。该代码很好地连接到数据库,但是一旦到达读取行,它就会说:
Exception has occurred: Py4JJavaError
An error occurred while calling o48.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra.
Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:674)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
有人能帮助我吗?
另外,我想添加有关此代码的更多详细信息:
我想要做的是测试 spark 持续多久从我的数据库中读取 200 万条记录,普通的 python-cassandra 驱动程序在大约 1 小时内读取 200 万条记录(使用 SimpleStatement)所以在这里我想知道它会最后使用火花与那些 2 M 记录。
谢谢
解决方案
您的类路径中没有 Spark Cassandra 连接器包,因此它找不到相应的类。
您需要spark-submit
从pyspark
.--packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1
如果您真的只想从 python 代码中执行此操作,那么您可以尝试.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.11:2.5.1")
在创建时添加SparkSession
,但如果类路径已经实例化,它可能并不总是有效。
PS Spark 通常应该会跑赢SimpleStatement
,即使在本地模式下也是如此,尽管 Spark 在分布式模式下确实大放异彩。您真的不应该使用SimpleStatement
for 重复执行仅在参数上有所不同的查询 - 您应该为此使用准备好的语句。请阅读使用 DataStax 驱动程序开发应用程序指南。DataStax 还赠送了Cassandra 的第 3 版。权威指南书——刚刚出版——我建议阅读它。
推荐阅读
- python-3.x - 如何在 python 中计算 2、2D kde 图之间的公共体积/交集?
- javascript - 用户登录后如何打开侧边栏选项卡
- macos - cmake:如何在 macOS 上的应用程序包中找到二进制文件?
- django - Pinax npm 安装失败
- c# - C#:无法从 JSON 字符串中获取子值
- python - Odoo划分两个浮点字段 - AttributeError
- java - 在 Windows 上使用 JPackage 是否可以让 main .exe 不作为控制台运行,但让其他 laucher 作为控制台运行?
- c# - 以编程方式获取 Azure 应用服务本地磁盘使用情况
- azure - 令牌在 Blazor WebAssembly 中过期时无法捕获 CosmosException
- c++ - GCC/Clang 分歧:是否
匹配 在 C++17 模板模板替换中?