node.js - 一些插入后出错 MongoNetworkError: connection 812 to 127.0.0.1:27017 closed in Node js
问题描述
我想使用 Node js 脚本在 Mongo DB 中插入大约 6000 万个数据,所以我创建了一个连接并像这样重用它:
连接.js
const MongoClient = require("mongodb").MongoClient,
{ mongourl,dbName } = require('../../env');
let db;
let mongoobject;
const option = {
useUnifiedTopology: true,
useNewUrlParser: true,
socketTimeoutMS: 300000,
poolSize:1000,
keepAlive: 300000,
connectTimeoutMS: 300000,
};
const connectDb = (callback) => {
if (db) return callback()
MongoClient.connect( mongourl, option,
(err, database) => {
if (err) return console.log(err)
db = database.db(dbName);
mongoobject = database;
callback()
}
)
}
const getDb = () => {
return db;
}
const connectclient = () => {
return mongoobject;
}
module.exports = {
connectDb,
getDb,
connectclient
}
我的插入功能是
function saveData(){
return new Promise(function (resolve, reject) {
try {
fs.access(dirPath, fs.F_OK, (err) => {
if (err) {
console.error(err)
return
}
const startTime = new Date();
let numlines = 0;
const fileReference = {}
fs.readdir(dirPath, function (err, filenames) {
if (err) {
console.error("Directory Not Found")
return;
}
filenames.forEach(function (filename) {
const readInterface = new readline(dirPath + filename);
let promises = [];
fileReference[filename] = 0
readInterface.on('line', function (line) {
fileReference[filename]++
let countcol = line.split('\t').length,
productname = line.split("\t"),
productsku = line.split("\t"),
productprice = line.split("\t");
let product_sku, product_name, product_price;
if (countcol == 3) {
product_sku = productname.splice(0, 2).join("-").toLowerCase();
product_name = productsku.splice(0, 2).join(" ");
product_price = productprice.splice(-1, 1);
} else if (countcol == 4) {
let product_sku_ini = productsku.splice(0, 2).join("").toLowerCase(),
product_sku_end = productsku.splice(0, 1).join(" ").toLowerCase(),
product_name_ini = productname.splice(0, 2).join(""),
product_name_end = productname.splice(0, 1).join(" ");
product_price = productprice.splice(-1, 1);
product_sku = product_sku_ini + "-" + product_sku_end
product_name = product_name_ini + " " + product_name_end
delete product_sku_ini, product_sku_end, product_name_ini, product_name_end,product_sku,product_name,product_price;
}
console.info('row start processing ==>>', filename, product_sku, line);
delete countcol, productname, productsku, productprice;
if (numlines >= 80000) {
readInterface.pause();
// console.log('promises:', promises)
Promise.all(promises)
.then(response => {
numlines = 0;
promises = [];
localStorage.setItem(filename, fileReference[filename]);
console.info(`filename Batch Resolved 1 ========>> ${filename}`, localStorage.getItem(filename))
console.log("====================================================== END 1============================================")
readInterface.resume()
// showHeapUses()
// setTimeout(() => process.exit(), 500)
// console.log('resume 1 time:', (new Date().getTime()) - startTime.getTime())
})
.catch(error => {
console.info(`Error in executing`, error)
numlines = 0;
readInterface.resume()
// console.log('resume 2 time:', (new Date()) - startTime)
})
}
console.log("num line", numlines)
numlines++
if(product_sku && product_name && product_price) {
const memoryUsedMb = process.memoryUsage().heapUsed / 1024 / 1024
console.info('the program used', memoryUsedMb, 'MB')
async.waterfall([
function (callback) {
const checkskuexists = async () => {
let checksamepro = { sku:product_sku };
let check_doc_exist = db.collection(collectionName).findOne(checksamepro);
return check_doc_exist;
}
checkskuexists().then(function(result) {
if(result === null){
callback(true, 'PRODUCT_NOT_FOUND');
}else{
callback(null, result.sku);
}
});
},
function (sku, callback) {
db.collection(collectionName).updateOne({sku:sku}, {$set:{price:product_price}});
resolve();
},
],function (err, result){
if (err) {
if (err && result == 'PRODUCT_NOT_FOUND') {
prodetails = {name:product_name, sku:product_sku, price:product_price, status:'active'}
db.collection(collectionName).insertOne(prodetails, function(err, res) {
if (err) throw err;
client.close();
});
}
resolve();
}
});
delete product_sku, product_name, product_price;
}else {
console.log('product is undefined -- so skiped', line);
delete product_sku, product_name, product_price;
}
});
readInterface.on('error', function (error) {
delete readInterface, fileReference, promises;
console.error("Error in reading file: ", error);
});
readInterface.on('end', function () {
// printPerformance(startTime);
localStorage.removeItem(filename);
Promise.all(promises)
.then(response => {
console.info(`filename Batch Resolved 2 ========>> ${filename} -- Completed`)
console.log("====================================================== END 2============================================")
})
.catch(error => {
console.info(`Error in executing`, error)
})
delete readInterface, fileReference, promises;
});
});
});
});
} catch (error) {
reject("ERROR GOES HERE ", error)
}
});
}
我得到的错误是:
MongoNetworkError: connection 812 to 127.0.0.1:27017 closed
at /var/www/html/reg-dealers-mongodb-script/node_modules/mongodb/lib/cmap/connection.js:68:15
at Map.forEach (<anonymous>)
at Socket.<anonymous> (/var/www/html/reg-dealers-mongodb-script/node_modules/mongodb/lib/cmap/connection.js:67:20)
at Socket.emit (events.js:314:20)
at Socket.EventEmitter.emit (domain.js:483:12)
at TCP.<anonymous> (net.js:675:12)
这是在插入 10k 或 20k 之后出现的,有时大约 100k 只是连接 812 没有。是不同的,其余错误是相同的,所以知道为什么会发生这种情况以及如何解决问题。
解决方案
您的插入功能太大而无法遵循。但是从错误中可以清楚地看出,您的插入函数正在从池中创建一个新的 mongo 连接。
通常,当单个连接用于阻塞操作时,池中的其他可用连接用于处理需要使用数据库的传入请求。正如您已将 1000 定义为池大小,这就是您看到的原因812 connection closed
。
一次插入不是一个明智的主意,60 Million data
而是将其分成较小的部分来组织您的数据库架构并遵循一些推荐的方法来保存它们(如集合最大大小、读/写操作、索引等)。当你需要保存多个文档时,你应该使用下面的 mongo 函数:
db.collection.insertMany(
[ <document 1> , <document 2>, ... ],
{
writeConcern: <document>,
ordered: <boolean>
}
)
有关更多详细信息,请查看此。
推荐阅读
- c++ - 没有 return 语句的函数中的 c++ 返回什么?
- c# - 将 C# 代码转换为 PHP:创建空字节数组 php 并使用套接字发送
- xamarin.ios - 适用于 Xamarin 的 Mac Mini
- php - 通过改造在文件上传期间关闭 Smack 连接
- apache-spark - Apache spark - 窗口函数,FIRST_VALUE 不起作用
- linux - 为什么 CMake 不尊重 LIBRARY_PATH 和 CPATH
- php - 为什么`mysqli_query()`返回null?我怎么能弄明白?
- linux - 独立辅助应用程序的缺点,而不是将其嵌入到主应用程序中
- c# - C# 裁剪位图 - 克隆与图形
- python - PySpark:在 sql 中访问向量元素