首页 > 解决方案 > 从 spark 更新 cassandra

问题描述

我是 cassandratfm.foehis中有数据的表。

当我第一次将数据从 spark 传输到 cassandra 时,我使用了这组命令:

import org.apache.spark.sql.functions._
import com.datastax.spark.connector._
import org.apache.spark.sql.cassandra._

val wkdir="/home/adminbigdata/tablas/"
val fileIn= "originales/22_FOEHIS2.csv"
val fileOut= "22_FOEHIS_PRE2"
val fileCQL= "22_FOEHISCQL"

val data = sc.textFile(wkdir + fileIn).filter(!_.contains("----")).map(_.trim.replaceAll(" +", "")).map(_.dropRight(1)).map(_.drop(1)).map(_.replaceAll(",", "")).filter(array => array(6) != "MOBIDI").filter(array => array(17) != "").saveAsTextFile(wkdir + fileOut)
val firstDF = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("mode", "DROPMALFORMED").option("delimiter", "|").load(wkdir + fileOut)
val columns: Array[String] = firstDF.columns
val reorderedColumnNames: Array[String] = Array("hoclic","hodtac","hohrac","hotpac","honrac","hocdan","hocdrs","hocdsl","hocol","hocpny","hodesf","hodtcl","hodtcm","hodtea","hodtra","hodtrc","hodtto","hodtua","hohrcl","hohrcm","hohrea","hohrra","hohrrc","hohrua","holinh","holinr","honumr","hoobs","hooe","hotdsc","hotour","housca","houscl","houscm","housea","houser","housra","housrc")
val secondDF= firstDF.select(reorderedColumnNames.head, reorderedColumnNames.tail: _*)
secondDF.write.cassandraFormat("foehis", "tfm").save()

但是当我使用相同的脚本加载新数据时,我得到了错误。我不知道怎么了?这是消息:

java.lang.UnsupportedOperationException: 'SaveMode is set to ErrorIfExists and Table
tfm.foehis already exists and contains data.
Perhaps you meant to set the DataFrame write mode to Append?
Example: df.write.format.options.mode(SaveMode.Append).save()" '

标签: apache-sparkcassandraspark-cassandra-connector

解决方案


错误消息清楚地表明您需要使用附加模式并显示您可以使用它做什么。在您的情况下,这是因为目标表已经存在,并且写入模式设置为“如果存在则错误”。如果你还想写数据,代码应该如下:

import org.apache.spark.sql.SaveMode
secondDF.write.cassandraFormat("foehis", "tfm").mode(SaveMode.Append).save()

推荐阅读