首页 > 解决方案 > Spark Scala 将数据帧写入 MongoDB

问题描述

我正在尝试使用此作为指南将转换后的数据框写入 MongoDB

https://docs.mongodb.com/spark-connector/master/scala/streaming/

到目前为止,我从 MongoDB 读取数据帧的工作非常顺利。如下所示。

  val mongoURI = "mongodb://000.000.000.000:27017"
  val Conf = makeMongoURI(mongoURI,"blog","articles")
  val readConfigintegra: ReadConfig = ReadConfig(Map("uri" -> Conf))


  val sparkSess = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.output.uri", "mongodb://000.000.000.000:27017/blog.vectors")
    .getOrCreate()



  // Uses the ReadConfig
  val df3 = sparkSess.sqlContext.loadFromMongoDB(ReadConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.articles")))

然而,将这个数据框写入 MongoDB 似乎更加困难。

 //reads data from mongo and does some transformations
    val data = read_mongo()
    data.show(20,false)
    data.write.mode("append").mongo()

对于最后一行,我收到以下错误。

Exception in thread "main" java.lang.IllegalArgumentException: Missing database name. Set via the 'spark.mongodb.output.uri' or 'spark.mongodb.output.database' property

当我在上面的代码块中的 spark Session 中设置它时,这似乎让我感到困惑。

 val sparkSess = SparkSession.builder()
    .master("local")
    .appName("MongoSparkConnectorIntro")
    .config("spark.mongodb.output.uri", "mongodb://000.000.000.000:27017/blog.vectors")
    .getOrCreate()

你能发现我做错了什么吗?

标签: mongodbscalaapache-sparkapache-spark-sql

解决方案


我的答案与我阅读它的方式非常相似,但使用 writeConfig 代替。

data.saveToMongoDB(WriteConfig(Map("uri" -> "mongodb://000.000.000.000:27017/blog.vectors")))

推荐阅读