首页 > 解决方案 > 使用 spark-mongo 进行更新插入

问题描述

我的收藏配置

{“_id”:“db_1.target_collection”,“lastmodEpoch”:ObjectId(“6076a37e37c2cca5853da6df”),“lastmod”:ISODate(“1970-02-19T17:02:47.301Z”),“dropped”:false,“key " : { "kfuin" : "hashed" }, "unique" : false, "uuid" : UUID("57c30bbe-af83-4410-a51f-c04f3c7522f4") }

我想从 mongo 读取并更新到 mongo

df.write.format('com.mongodb.spark.sql') \
    .option('collection', 'target_collection') \
    .option('replaceDocument', 'false') \
    .option('shardKey', '{kfuin: 1}') \
    .mode('append') \
    .save()

当我在 replaceDocument 为真时尝试更新插入时出现此异常

com.mongodb.MongoBulkWriteException: Bulk write operation error on server ... message='应用更新后,发现(不可变)字段'_id'已更改为_id: ObjectId('5f80331981f3601291e04a1c')

当 replaceDocument 为假时

对路径“_id”执行更新将修改不可变字段“_id”。

有任何想法吗?

标签: mongodbscalaapache-spark

解决方案


解决

如果要更新数据,shardKey需要添加“_id”

.option('shardKey', '{kfuin: 1,_id: 1}') 

MongoSpark.save 方法:

def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
    val mongoConnector = MongoConnector(writeConfig.asOptions)
    val dataSet = dataset.toDF()
    val mapper = rowToDocumentMapper(dataSet.schema)
    val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
    val fieldNames = dataset.schema.fieldNames.toList
    val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList

    if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
      MongoSpark.save(documentRdd, writeConfig)
    } else {
      documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
        mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
          iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
            val requests = batch.map(doc =>
              if (queryKeyList.forall(doc.containsKey(_))) {
                val queryDocument = new BsonDocument()
                queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
                if (writeConfig.replaceDocument) {
                  new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
                } else {
                  queryDocument.keySet().asScala.foreach(doc.remove(_))
                  new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
                }
              } else {
                new InsertOneModel[BsonDocument](doc)
              })
            collection.bulkWrite(requests.toList.asJava)
          })
        })
      })
    }
  }

推荐阅读