mongodb - 使用 spark-mongo 进行更新插入
问题描述
我的收藏配置
{“_id”:“db_1.target_collection”,“lastmodEpoch”:ObjectId(“6076a37e37c2cca5853da6df”),“lastmod”:ISODate(“1970-02-19T17:02:47.301Z”),“dropped”:false,“key " : { "kfuin" : "hashed" }, "unique" : false, "uuid" : UUID("57c30bbe-af83-4410-a51f-c04f3c7522f4") }
我想从 mongo 读取并更新到 mongo
df.write.format('com.mongodb.spark.sql') \
.option('collection', 'target_collection') \
.option('replaceDocument', 'false') \
.option('shardKey', '{kfuin: 1}') \
.mode('append') \
.save()
当我在 replaceDocument 为真时尝试更新插入时出现此异常
com.mongodb.MongoBulkWriteException: Bulk write operation error on server ... message='应用更新后,发现(不可变)字段'_id'已更改为_id: ObjectId('5f80331981f3601291e04a1c')
当 replaceDocument 为假时
对路径“_id”执行更新将修改不可变字段“_id”。
有任何想法吗?
解决方案
解决
如果要更新数据,shardKey需要添加“_id”
.option('shardKey', '{kfuin: 1,_id: 1}')
MongoSpark.save 方法:
def save[D](dataset: Dataset[D], writeConfig: WriteConfig): Unit = {
val mongoConnector = MongoConnector(writeConfig.asOptions)
val dataSet = dataset.toDF()
val mapper = rowToDocumentMapper(dataSet.schema)
val documentRdd: RDD[BsonDocument] = dataSet.rdd.map(row => mapper(row))
val fieldNames = dataset.schema.fieldNames.toList
val queryKeyList = BsonDocument.parse(writeConfig.shardKey.getOrElse("{_id: 1}")).keySet().asScala.toList
if (writeConfig.forceInsert || !queryKeyList.forall(fieldNames.contains(_))) {
MongoSpark.save(documentRdd, writeConfig)
} else {
documentRdd.foreachPartition(iter => if (iter.nonEmpty) {
mongoConnector.withCollectionDo(writeConfig, { collection: MongoCollection[BsonDocument] =>
iter.grouped(writeConfig.maxBatchSize).foreach(batch => {
val requests = batch.map(doc =>
if (queryKeyList.forall(doc.containsKey(_))) {
val queryDocument = new BsonDocument()
queryKeyList.foreach(key => queryDocument.append(key, doc.get(key)))
if (writeConfig.replaceDocument) {
new ReplaceOneModel[BsonDocument](queryDocument, doc, new ReplaceOptions().upsert(true))
} else {
queryDocument.keySet().asScala.foreach(doc.remove(_))
new UpdateOneModel[BsonDocument](queryDocument, new BsonDocument("$set", doc), new UpdateOptions().upsert(true))
}
} else {
new InsertOneModel[BsonDocument](doc)
})
collection.bulkWrite(requests.toList.asJava)
})
})
})
}
}
推荐阅读
- sql-server - 在子查询中聚合按位或*分区*?
- python-3.x - 如何使用 pygerrit2 REST api 获取当前的补丁集信息?
- postgresql - 如何清除托管在 aws rds 中的 postgres 中的缓冲区缓存?
- android - Android JetpackCompose 中的 ComponentActivity 与 AppCompactActivity
- azure - Bot Framework Composer 错误:无法从状态获取 KnowledgeBaseId 的值
- java - 如何停止新活动中的音乐?
- jquery-ui - 使用 jQuery 可拖动时,为什么我的 div 没有对齐到所有方向?
- email - 如何在 Jenkins 中为 SMTP 启用 TLSv1.2
- javascript - 有没有办法让 Angular 9+ 生命周期钩子在每次从 DOM 中添加或删除组件时触发?
- sql-server - PDI 通过 VPN 连接数据库