node.js - 使用 Node JS 脚本对 MongoDb 进行多线程 100 万次插入
问题描述
我有一个隔离的同步服务器,它从外部 ftp 服务器拉出一个标签受限的文本文件,并在处理后更新(保存)到 mongodb。我的代码看起来像这样
//this function pulls file from external ftp server
async function upsteamFile() {
try {
let pythonProcess = spawn('python3', [configVar.ftpInbound, '/outbound/Items.txt', configVar.dataFiles.items], {encoding: 'utf8'});
logger.info('FTP SERVER LOGS...' + '\n' + pythonProcess.stdout);
await readItemFile();
logger.info('The process of file is done');
process.exit();
} catch (upstreamError) {
logger.error(upstreamError);
process.exit();
}
}
//this function connects to db and calls processing function for each row in the text file.
async function readItemFile(){
try{
logger.info('Reading Items File');
let dataArray = fs.readFileSync(configVar.dataFiles.items, 'utf8').toString().split('\n');
logger.info('No of Rows Read', dataArray.length);
await dbConnect.connectToDB(configVar.db);
logger.info('Connected to Database', configVar.db);
while (dataArray.length) {
await Promise.all( dataArray.splice(0, 5000).map(async (f) => {
splitValues = f.split('|');
await processItemsFile(splitValues)
})
)
logger.info("Current batch finished processing")
}
logger.info("ALL batch finished processing")
}
catch(PromiseError){
logger.error(PromiseError)
}
}
async function processItemsFile(splitValues) {
try {
// Processing of the file is done here and I am using 'save' in moongoose to write to db
// data is cleaned and assigned to respective fields
if(!exists){
let processedValues = new Products(assignedValues);
let productDetails = await processedValues.save();
}
return;
}
catch (error) {
throw error
}
}
upstream()
所以这需要大约 3 个小时来处理 100,000,000 行并在数据库中更新它。有什么办法可以加快速度。我的硬件非常有限。我正在使用一个基于 ec2 实例的 linux 服务器,它有 2 个核心和 4 gb ram。我应该使用像microjob这样的工作线程来运行 multi-threads 吗?如果是,那么我将如何去做或者这是最大的性能?
注意:我不能在 mongodb 中进行批量更新,因为保存时会触发 mongoose 预钩子
解决方案
您可以随时尝试使用updateOne
方法进行批量更新。
我也会考虑使用readFileStream
而不是readFileSync
。
使用事件驱动的架构,您可以推送,假设每100k更新到数组块并同时对它们进行批量更新。
您可以在此操作期间触发 pre updateOne()
(而不是save()
)钩子。
我用以下解决方案解决了一个类似的问题(更新 100k CSV 行):
- 创建一个
readFileStream
(多亏了这一点,如果文件很大,您的应用程序不会消耗太多堆内存)
我正在使用 CSV-parser npm 库将 CSV 文件解构为单独的数据行:
let updates = [];
fs.createReadStream('/filePath').pipe(csv())
.on('data', row => {
// ...do anything with the data
updates.push({
updateOne: {
filter: { /* here put the query */ },
update: [ /* any data you want to update */ ],
upsert: true /* in my case I want to create record if it does not exist */
}
})
})
.on('end', async () => {
await MyCollection.bulkWrite(data)
.catch(err => {
logger.error(err);
})
updates = []; // I just clean up the huge array
})
推荐阅读
- java - 引发 NullPointerException,即使它不是 NULL
- highcharts - 如何使用 highcharts 调整树形图中矩形的大小
- python - 无法写入 bigquery - 权限被拒绝:Apache Beam Python - Google Dataflow
- android - 如何同时从当前启动 2 个应用程序(一个在后台)?
- javascript - 如何在选择框中添加多个选项
- php - 通过 pecl 安装 lua php 扩展失败
- .net - 正则表达式 - 不包含除 end 之外的某些字符
- python - Django 形式的 type="datetime-local"
- python - 代理后面的 pip 9.0.1 升级
- java - Jaunt 超链接用 %3F 和 %3D 替换值