java - 使用java spark将数据集保存到cassandra
问题描述
我正在尝试使用 java spark 将数据集保存到 cassandra db。我可以使用下面的代码成功地将数据读入数据集
Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();
但是当我尝试编写数据集时,我得到IOException: Could not load or find table, found similar tables in keyspace
Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();
我在 sparksession 中设置主机和端口问题是我能够以覆盖和附加模式写入但无法创建表
我正在使用的版本如下: spark java 2.0 spark cassandra connector 2.3
尝试了不同的 jar 版本但没有任何效果我也经历了不同的堆栈溢出和 github 链接
任何帮助是极大的赞赏。
解决方案
Spark 中的write
操作没有自动为您创建表的模式 - 有多种原因。其中之一是您需要为您的表定义一个主键,否则,如果您设置了错误的主键,您可能只会覆盖数据。因此,Spark Cassandra 连接器提供了一种单独的方法来根据您的数据帧结构创建表,但您需要提供分区和集群键列的列表。在 Java 中,它将如下所示(完整代码在此处):
DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
partitionSeqlist, clusteringSeqlist, connector);
然后你可以像往常一样写数据:
dataset.write()
.format("org.apache.spark.sql.cassandra")
.options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
.save();
推荐阅读
- c++ - STL 是否提供了一些东西来查找由两个非反向迭代器定义的范围内谓词为真的最后一个元素?
- openlayers-6 - 找出所有已成功加载到图层的要素的事件
- arrays - 如何使用用户定义的索引值在飞镖中创建一个列表?
- aspnetboilerplate - 将 Worker Service 项目添加到 abp.io 解决方案
- android - GridView 适配器出现问题
- c# - 正则表达式
- php - php 正则表达式检查字符串是否以包含语言代码的特定字符串开头
- ios - ld:架构 arm64 的 81 个重复符号
- sql - 如何使用 Oracle SQL 生成以下序列?
- php - 使用 PHP 变量动态设置多个文本框名称并从中获取值?