node.js - NodeJS 中的异步 foreach 或 for 循环
问题描述
我是NodeJs的初学者,请原谅。以下 lambda 函数是在 S3 中压缩/压缩文件并将压缩文件上传回 S3。listOfKeys
包含要压缩的密钥列表。如果您注意到for (const file in listOfKeys)
,如果有一个大数据集,即如果listOfKeys
有很长的键列表,这会同步运行导致 lambda 超时。问题是,有没有办法异步或并行运行循环,以便及时或异步压缩文件?
代码:
const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');
const awsOptions = {
region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);
const streamTo = (bucket, key) => {
var passthrough = new stream.PassThrough();
s3.upload({
Bucket: bucket,
Key: key,
Body: passthrough,
ContentType: "application/zip",
},
(err, data) => {
if (err) throw err;
}
);
return passthrough;
};
const getStream = (bucket, key) => {
let streamCreated = false;
const passThroughStream = new stream.PassThrough();
passThroughStream.on("newListener", event => {
if (!streamCreated && event == "data") {
const s3Stream = s3
.getObject({ Bucket: bucket, Key: key })
.createReadStream();
s3Stream
.on("error", err => passThroughStream.emit("error", err))
.pipe(passThroughStream);
streamCreated = true;
}
});
return passThroughStream;
};
exports.handler = async (event, context, callback) => {
let totalKeys = 0;
const listOfKeys = [];
const SrcBucket = event.Records[0].s3.bucket.name;
const trigger_file = event.Records[0].s3.object.key;
const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
const dirToZip = trigger_file.split('/')[2].substr(0, trigger_file.split('/')[2].length - '.renamed'.length);
const s3ListFilter = prefix + dirToZip;
const destinationKey = prefix + `${dirToZip}.zip`;
const bucketParams = {
Bucket: SrcBucket,
Delimiter: '/',
Prefix: s3ListFilter + '/'
};
let data;
do {
bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
data = await s3.listObjects(bucketParams).promise();
const contents = data.Contents;
totalKeys = totalKeys + contents.length;
listOfKeys.push(...contents.map(x => x.Key));
} while (data.IsTruncated);
console.log(`Total keys: ${listOfKeys.length}`);
await new Promise(async (resolve, reject) => {
var zipStream = streamTo(SrcBucket, destinationKey);
zipStream.on("close", resolve);
zipStream.on("end", resolve);
zipStream.on("error", reject);
var archive = archiver("zip");
archive.on("error", err => {
throw new Error(err);
});
archive.pipe(zipStream);
var keysCounter = 0;
listOfKeys.forEach(file => {
archive.append(getStream(SrcBucket, file), { name: file.split('/')[3] })
keysCounter++
if (keysCounter >= Object.keys(listOfKeys).length) {
// Called at the end of the loop
archive.finalize();
}
});
//archive.finalize();
}).catch(err => {
throw new Error(err);
});
callback(null, {
body: { final_destination: destinationKey }
});
};
解决方案
我可能会更积极地重写整个事情,但要回答你的具体问题:listOfKeys.forEach
用这个替换你的陈述:
await Promise
.all(
listOfKeys.map(key => archive.append(getStream(SrcBucket, key), { name: key.split('/')[3] }))
);
推荐阅读
- java - @Configuration 注释的问题。Singelton bean 创建得太早
- android - 指针图标可以像其他视图一样在 RecyclerView 上设置吗?
- c++ - 为什么公共重载与某些编译器上的私有 using 指令冲突?
- java - 从 JDBC 触发 SQL 命令时出错 - java.sql.SQLSyntaxErrorException
- javascript - 从下拉列表中选择正确的图像
- javascript - Invariant Violation Invalid hook call 部署错误
- python - Python给出AttributeError:'PngImageFile'对象没有带有Pillow的'read'属性
- java - 如何使用 RestTemplate 发布 bean 列表,但 bean 必须是字节数组?
- javascript - Ajv 响应编号为字符串,带有 coerceTypes: true
- asynchronous - 当在同一个 React 块中使用不同的线程调度程序时会发生什么?