首页 > 解决方案 > 如何在pyspark数据框中读取scylladb表

问题描述

我正在尝试将安装了一台 pc 的 scylladb 表读取到另一台 pc 上的 pyspark 数据帧中。
2 台电脑有 ssh 连接,我可以通过 python 代码读取表格,只有在连接 spark 时才会出现问题。我使用了这个连接器:

--packages datastax:spark-cassandra-connector:2.3.0-s_2.11 , 

我的 spark -version = 2.3.1 ,scala-version-2.11.8。

**First Approach**
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.cassandra.connection.host","192.168.0.118")
sc = SparkContext(conf = conf)
spark=SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
data=spark.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

结果错误:

文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”,第 172 行,在加载文件“/usr/local/spark/python/lib/py4j-0.10.7- src.zip/py4j/java_gateway.py”,第 1257 行,通话中 文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在 deco 文件“/usr/local/spark/python/lib/py4j-0.10.7- src.zip/py4j/protocol.py",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o43.load 时出错。: java.lang.ClassNotFoundException: org.apache.spark.Logging 在 Spark 2.0 中被移除。请在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala) 的 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:646) 检查您的库是否与 Spark 2.0 兼容:190) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl。

我使用的另一种方法是:

data=sc.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

为此,我得到:

AttributeError:“SparkContext”对象没有“读取”属性

第三种方法:

data=sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()

为此,我得到与第一种方法相同的错误。

请告知是scylla spark连接器问题还是一些火花库问题以及如何解决。

标签: apache-sparkcassandrascylla

解决方案


按着这些次序 :

1.使用 packages 行运行 spark-shell。使用 --conf 配置默认的 Spark 配置传递键值对,在我的情况下,scylla 主机是 172.17.0.2

bin/spark-shell --conf spark.cassandra.connection.host=172.17.0.2 --packages datastax:spark-cassandra-connector:2.3.0-s_2.11

2.在 SparkContext、SparkSession、RDD 和 DataFrame 上启用 Cassandra 特定功能:

import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

3.从scylla加载数据

val rdd = sc.cassandraTable("my_keyspace", "my_table")

4.测试

scala> rdd.collect().foreach(println)
CassandraRow{id: 1, name: ash}

推荐阅读