首页 > 解决方案 > 使用 Node JS 脚本对 MongoDb 进行多线程 100 万次插入

问题描述

我有一个隔离的同步服务器,它从外部 ftp 服务器拉出一个标签受限的文本文件,并在处理后更新(保存)到 mongodb。我的代码看起来像这样

//this function pulls file from external ftp server 
async function upsteamFile() {
  try {
    let pythonProcess = spawn('python3', [configVar.ftpInbound, '/outbound/Items.txt', configVar.dataFiles.items], {encoding: 'utf8'});
    logger.info('FTP SERVER LOGS...' + '\n' + pythonProcess.stdout);
    await readItemFile();
    logger.info('The process of file is done');

    process.exit();
  } catch (upstreamError) {
    logger.error(upstreamError);

    process.exit();
  }
}


//this function connects to db and calls processing function for each row in the text file.
async function readItemFile(){
  try{
  logger.info('Reading Items File');

  let dataArray = fs.readFileSync(configVar.dataFiles.items, 'utf8').toString().split('\n');
  logger.info('No of Rows Read', dataArray.length);
  await dbConnect.connectToDB(configVar.db);
  logger.info('Connected to Database', configVar.db);
  while (dataArray.length) {
      await Promise.all( dataArray.splice(0, 5000).map(async (f) => {
      splitValues = f.split('|'); 
      await processItemsFile(splitValues)
    })
    )
    logger.info("Current batch finished processing")
  }
  logger.info("ALL batch finished processing")
}
  catch(PromiseError){
    logger.error(PromiseError)
  }
}




async function processItemsFile(splitValues) {
  try {
      // Processing of the file is done here and I am using 'save' in moongoose to write to db
     // data is cleaned and assigned to respective fields 
        if(!exists){
           let processedValues = new Products(assignedValues);
           let productDetails = await processedValues.save();  
    }
      return;
    }
  catch (error) {
   throw error
  }
}
upstream()

所以这需要大约 3 个小时来处理 100,000,000 行并在数据库中更新它。有什么办法可以加快速度。我的硬件非常有限。我正在使用一个基于 ec2 实例的 linux 服务器,它有 2 个核心和 4 gb ram。我应该使用像microjob这样的工作线程来运行 multi-threads 吗?如果是,那么我将如何去做或者这是最大的性能?

注意:我不能在 mongodb 中进行批量更新,因为保存时会触发 mongoose 预钩子

标签: node.jsmongodbmultithreadingworker-thread

解决方案


您可以随时尝试使用updateOne方法进行批量更新。

我也会考虑使用readFileStream而不是readFileSync

使用事件驱动的架构,您可以推送,假设每100k更新到数组块并同时对它们进行批量更新。

您可以在此操作期间触发 pre updateOne()(而不是save())钩子。

我用以下解决方案解决了一个类似的问题(更新 100k CSV 行):

  1. 创建一个readFileStream(多亏了这一点,如果文件很大,您的应用程序不会消耗太多堆内存)

我正在使用 CSV-parser npm 库将 CSV 文件解构为单独的数据行:

let updates = [];
fs.createReadStream('/filePath').pipe(csv())
    .on('data', row => {
        // ...do anything with the data
        updates.push({
            updateOne: {
                filter: { /* here put the query */ },
                update: [ /* any data you want to update */ ],
                upsert: true /* in my case I want to create record if it does not exist */
            }
        })
    })
    .on('end', async () => {
        await MyCollection.bulkWrite(data)
            .catch(err => {
                logger.error(err);
            })

        updates = []; // I just clean up the huge array
    })         

推荐阅读