首页 > 解决方案 > 使用java spark将数据集保存到cassandra

问题描述

我正在尝试使用 java spark 将数据集保存到 cassandra db。我可以使用下面的代码成功地将数据读入数据集

Dataset<Row> readdf = sparkSession.read().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.load();

但是当我尝试编写数据集时,我得到IOException: Could not load or find table, found similar tables in keyspace

Dataset<Row> dfwrite= readdf.write().format("org.apache.spark.sql.cassandra")
.option("keyspace","dbname")
.option("table","tablename")
.save();

我在 sparksession 中设置主机和端口问题是我能够以覆盖和附加模式写入但无法创建表

我正在使用的版本如下: spark java 2.0 spark cassandra connector 2.3

尝试了不同的 jar 版本但没有任何效果我也经历了不同的堆栈溢出和 github 链接

任何帮助是极大的赞赏。

标签: javaapache-sparkcassandraspark-cassandra-connector

解决方案


Spark 中的write操作没有自动为您创建表的模式 - 有多种原因。其中之一是您需要为您的表定义一个主键,否则,如果您设置了错误的主键,您可能只会覆盖数据。因此,Spark Cassandra 连接器提供了一种单独的方法来根据您的数据帧结构创建表,但您需要提供分区和集群键列的列表。在 Java 中,它将如下所示(完整代码在此处):

DataFrameFunctions dfFunctions = new DataFrameFunctions(dataset);
Option<Seq<String>> partitionSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("part")).seq());
Option<Seq<String>> clusteringSeqlist = new Some<>(JavaConversions.asScalaBuffer(
          Arrays.asList("clust", "col2")).seq());
CassandraConnector connector = new CassandraConnector(
          CassandraConnectorConf.apply(spark.sparkContext().getConf()));
dfFunctions.createCassandraTable("test", "widerows6",
          partitionSeqlist, clusteringSeqlist, connector);

然后你可以像往常一样写数据:

dataset.write()
   .format("org.apache.spark.sql.cassandra")
   .options(ImmutableMap.of("table", "widerows6", "keyspace", "test"))
   .save();

推荐阅读