首页 > 解决方案 > 无法从 pyspark 的 cassandra 数据库加载信息

问题描述

我有这个代码:

import os
from pyspark import SparkContext,SparkFiles,SQLContext,SparkFiles
from pyspark.sql import SQLContext, SparkSession
from pyspark.sql.functions import col

secure_bundle_file=os.getcwd()+'\\secure-connect-dbtest.zip'
sparkSession =SparkSession.builder.appName('SparkCassandraApp')\
  .config('spark.cassandra.connection.config.cloud.path',secure_bundle_file)\
  .config('spark.cassandra.auth.username', 'test')\
  .config('spark.cassandra.auth.password','testquart')\
  .config('spark.dse.continuousPagingEnabled',False)\
  .master('local[*]').getOrCreate()

data = sparkSession.read.format("org.apache.spark.sql.cassandra")\
  .options(table="tbthesis", keyspace="test").load()
data.count()

我尝试做的是连接到我的数据库并检索我的数据。该代码很好地连接到数据库,但是一旦到达读取行,它就会说:

Exception has occurred: Py4JJavaError
An error occurred while calling o48.load.
: java.lang.ClassNotFoundException: Failed to find data source: org.apache.spark.sql.cassandra. 
Please find packages at http://spark.apache.org/third-party-projects.html
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:674)
at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:728)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:203)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)

有人能帮助我吗?

另外,我想添加有关此代码的更多详细信息:

我想要做的是测试 spark 持续多久从我的数据库中读取 200 万条记录,普通的 python-cassandra 驱动程序在大约 1 小时内读取 200 万条记录(使用 SimpleStatement)所以在这里我想知道它会最后使用火花与那些 2 M 记录。

谢谢

标签: apache-sparkpysparkcassandraspark-cassandra-connector

解决方案


您的类路径中没有 Spark Cassandra 连接器包,因此它找不到相应的类。

您需要spark-submitpyspark.--packages com.datastax.spark:spark-cassandra-connector_2.11:2.5.1

如果您真的只想从 python 代码中执行此操作,那么您可以尝试.config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.11:2.5.1")在创建时添加SparkSession,但如果类路径已经实例化,它可能并不总是有效。

PS Spark 通常应该会跑赢SimpleStatement,即使在本地模式下也是如此,尽管 Spark 在分布式模式下确实大放异彩。您真的不应该使用SimpleStatementfor 重复执行仅在参数上有所不同的查询 - 您应该为此使用准备好的语句。请阅读使用 DataStax 驱动程序开发应用程序指南。DataStax 还赠送了Cassandra 的第 3 版。权威指南书——刚刚出版——我建议阅读它。


推荐阅读