首页 > 解决方案 > 一些插入后出错 MongoNetworkError: connection 812 to 127.0.0.1:27017 closed in Node js

问题描述

我想使用 Node js 脚本在 Mongo DB 中插入大约 6000 万个数据,所以我创建了一个连接并像这样重用它:

连接.js

const MongoClient = require("mongodb").MongoClient,
    { mongourl,dbName } = require('../../env');
    let db;
    let mongoobject;

    const option = {
        useUnifiedTopology: true,
        useNewUrlParser: true,
        socketTimeoutMS: 300000,
        poolSize:1000,
        keepAlive: 300000,
        connectTimeoutMS: 300000,
    };

    const connectDb = (callback) => {
        if (db) return callback()
        MongoClient.connect( mongourl, option, 
            (err, database) => {
                if (err) return console.log(err)
                db = database.db(dbName);
                mongoobject = database;
                callback()
            }
        )
    }

    const getDb = () => {
        return db;
    }

    const connectclient = () => {
        return mongoobject;
    }


    module.exports = {
        connectDb,
        getDb,
        connectclient
    }

我的插入功能是

function saveData(){
  return new Promise(function (resolve, reject) {
    try {
      fs.access(dirPath, fs.F_OK, (err) => {
          if (err) {
              console.error(err)
              return
          }
          const startTime = new Date();
          let numlines = 0;
          const fileReference = {}

          fs.readdir(dirPath, function (err, filenames) {
              if (err) {
                  console.error("Directory Not Found")
                  return;
              }
              filenames.forEach(function (filename) {
                  const readInterface = new readline(dirPath + filename);
                  let promises = [];
                  fileReference[filename] = 0

                  readInterface.on('line', function (line) {
                      fileReference[filename]++
                      let countcol = line.split('\t').length,
                          productname = line.split("\t"),
                          productsku = line.split("\t"),
                          productprice = line.split("\t");

                      let product_sku, product_name, product_price;

                      if (countcol == 3) {
                          product_sku = productname.splice(0, 2).join("-").toLowerCase();
                          product_name = productsku.splice(0, 2).join(" ");
                          product_price = productprice.splice(-1, 1);

                      } else if (countcol == 4) {
                          let product_sku_ini = productsku.splice(0, 2).join("").toLowerCase(),
                              product_sku_end = productsku.splice(0, 1).join(" ").toLowerCase(),
                              product_name_ini = productname.splice(0, 2).join(""),
                              product_name_end = productname.splice(0, 1).join(" ");

                          product_price = productprice.splice(-1, 1);
                          product_sku = product_sku_ini + "-" + product_sku_end
                          product_name = product_name_ini + " " + product_name_end

                          delete product_sku_ini, product_sku_end, product_name_ini, product_name_end,product_sku,product_name,product_price;

                      }
                      console.info('row start processing ==>>', filename, product_sku, line);
                      delete countcol, productname, productsku, productprice;

                      if (numlines >= 80000) {
                          readInterface.pause();
                          // console.log('promises:', promises)

                          Promise.all(promises)
                              .then(response => {

                                  numlines = 0;
                                  promises = [];
                                  localStorage.setItem(filename, fileReference[filename]);
                                  console.info(`filename Batch Resolved 1 ========>> ${filename}`, localStorage.getItem(filename))
                                  console.log("====================================================== END 1============================================")
                                  readInterface.resume()
                                  // showHeapUses()
                                  // setTimeout(() => process.exit(), 500)
                                  // console.log('resume 1 time:', (new Date().getTime()) - startTime.getTime())
                              })
                              .catch(error => {
                                  console.info(`Error in executing`, error)
                                  numlines = 0;
                                  readInterface.resume()
                                  // console.log('resume 2 time:', (new Date()) - startTime)
                              })
                      }
                      console.log("num line", numlines)
                      numlines++
                      
                      if(product_sku && product_name && product_price) {
                          const memoryUsedMb = process.memoryUsage().heapUsed / 1024 / 1024
                          console.info('the program used', memoryUsedMb, 'MB')
                          async.waterfall([
                              function (callback) {
                                  const checkskuexists = async () => {
                                      let checksamepro = { sku:product_sku };
                                      let check_doc_exist = db.collection(collectionName).findOne(checksamepro);
                                      return check_doc_exist;
                                  }
                                  checkskuexists().then(function(result) {
                                      if(result === null){
                                          callback(true, 'PRODUCT_NOT_FOUND');
                                      }else{
                                          callback(null, result.sku);
                                      }
                                  });
                              },
                              function (sku, callback) {
                                  db.collection(collectionName).updateOne({sku:sku}, {$set:{price:product_price}});
                                  resolve();
                              },
                          ],function (err, result){
                            if (err) {
                                if (err && result == 'PRODUCT_NOT_FOUND') {
                                  prodetails = {name:product_name, sku:product_sku, price:product_price, status:'active'}
                                  db.collection(collectionName).insertOne(prodetails, function(err, res) {
                                    if (err) throw err;
                                    client.close();
                                  });
                                }
                                resolve();
                            }
                          });
                          delete product_sku, product_name, product_price;
                      }else {
                          console.log('product is undefined -- so skiped', line);
                          delete product_sku, product_name, product_price;
                      }

                  });

                  readInterface.on('error', function (error) {
                      delete readInterface, fileReference, promises;
                      console.error("Error in reading file: ", error);
                  });

                  readInterface.on('end', function () {
                      // printPerformance(startTime);
                      localStorage.removeItem(filename);
                      Promise.all(promises)
                          .then(response => {
                              console.info(`filename Batch Resolved 2 ========>> ${filename} -- Completed`)
                              console.log("====================================================== END 2============================================")
                          })
                          .catch(error => {
                              console.info(`Error in executing`, error)
                          })
                      delete readInterface, fileReference, promises;
                  });
              });
          });
      });
    } catch (error) {
        reject("ERROR GOES HERE ", error)
    }
  });
}

我得到的错误是:

MongoNetworkError: connection 812 to 127.0.0.1:27017 closed
        at /var/www/html/reg-dealers-mongodb-script/node_modules/mongodb/lib/cmap/connection.js:68:15
        at Map.forEach (<anonymous>)
        at Socket.<anonymous> (/var/www/html/reg-dealers-mongodb-script/node_modules/mongodb/lib/cmap/connection.js:67:20)
        at Socket.emit (events.js:314:20)
        at Socket.EventEmitter.emit (domain.js:483:12)
        at TCP.<anonymous> (net.js:675:12)

这是在插入 10k 或 20k 之后出现的,有时大约 100k 只是连接 812 没有。是不同的,其余错误是相同的,所以知道为什么会发生这种情况以及如何解决问题。

标签: node.jsmongodbmongodb-queryconnectiondatabase-connection

解决方案


您的插入功能太大而无法遵循。但是从错误中可以清楚地看出,您的插入函数正在从池中创建一个新的 mongo 连接。

通常,当单个连接用于阻塞操作时,池中的其他可用连接用于处理需要使用数据库的传入请求。正如您已将 1000 定义为池大小,这就是您看到的原因812 connection closed

一次插入不是一个明智的主意,60 Million data而是将其分成较小的部分来组织您的数据库架构并遵循一些推荐的方法来保存它们(如集合最大大小、读/写操作、索引等)。当你需要保存多个文档时,你应该使用下面的 mongo 函数:

db.collection.insertMany(
   [ <document 1> , <document 2>, ... ],
   {
      writeConcern: <document>,
      ordered: <boolean>
   }
)

有关更多详细信息,请查看


推荐阅读