apache-spark - 如何在pyspark数据框中读取scylladb表
问题描述
我正在尝试将安装了一台 pc 的 scylladb 表读取到另一台 pc 上的 pyspark 数据帧中。
2 台电脑有 ssh 连接,我可以通过 python 代码读取表格,只有在连接 spark 时才会出现问题。我使用了这个连接器:
--packages datastax:spark-cassandra-connector:2.3.0-s_2.11 ,
我的 spark -version = 2.3.1 ,scala-version-2.11.8。
**First Approach**
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
conf = SparkConf().set("spark.cassandra.connection.host","192.168.0.118")
sc = SparkContext(conf = conf)
spark=SparkSession.builder.config(conf=conf).appName('FinancialRecon').getOrCreate()
sqlContext =SQLContext(sc)
data=spark.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
结果错误:
文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py”,第 172 行,在加载文件“/usr/local/spark/python/lib/py4j-0.10.7- src.zip/py4j/java_gateway.py”,第 1257 行,通话中 文件“/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py”,第 63 行,在 deco 文件“/usr/local/spark/python/lib/py4j-0.10.7- src.zip/py4j/protocol.py",第 328 行,在 get_return_value py4j.protocol.Py4JJavaError:调用 o43.load 时出错。: java.lang.ClassNotFoundException: org.apache.spark.Logging 在 Spark 2.0 中被移除。请在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala) 的 org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:646) 检查您的库是否与 Spark 2.0 兼容:190) 在 org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164) 在 sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 在 sun.reflect.NativeMethodAccessorImpl。
我使用的另一种方法是:
data=sc.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
为此,我得到:
AttributeError:“SparkContext”对象没有“读取”属性
第三种方法:
data=sqlContext.read.format("org.apache.spark.sql.cassandra").options(table="datarecon",keyspace="finrecondata").load().show()
为此,我得到与第一种方法相同的错误。
请告知是scylla spark连接器问题还是一些火花库问题以及如何解决。
解决方案
按着这些次序 :
1.使用 packages 行运行 spark-shell。使用 --conf 配置默认的 Spark 配置传递键值对,在我的情况下,scylla 主机是 172.17.0.2
bin/spark-shell --conf spark.cassandra.connection.host=172.17.0.2 --packages datastax:spark-cassandra-connector:2.3.0-s_2.11
2.在 SparkContext、SparkSession、RDD 和 DataFrame 上启用 Cassandra 特定功能:
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._
3.从scylla加载数据
val rdd = sc.cassandraTable("my_keyspace", "my_table")
4.测试
scala> rdd.collect().foreach(println)
CassandraRow{id: 1, name: ash}
推荐阅读
- timer - JMeter 常量计时器未按预期运行
- django - 多次选择时如何防止订单在结账界面重复显示
- javascript - 在 Chart.js MVC c# 中使用模型中的数据
- javascript - 如何将简单字符串从本地 HTML 传输到 Swift-App?
- python - 语法错误“打破循环”
- powershell - 我可以使用 powershell 用于显示某些列的机制吗?
- inno-setup - Inno Setup 中的通配符(测试固定字符串前缀后是否有任何值)
- java - Spring UriComponentsBuilder 逃脱了“?” 而不是相同的,它给了我“?”的unicode值
- nativescript - 如何添加 iOS 原生库并在 nativescript 上使用?
- amazon-web-services - 从 AWS SDK 从 AWS EC2 访问 S3,无需访问 ID 和密钥