首页 > 解决方案 > NodeJS 中的异步 foreach 或 for 循环

问题描述

我是NodeJs的初学者,请原谅。以下 lambda 函数是在 S3 中压缩/压缩文件并将压缩文件上传回 S3。listOfKeys包含要压缩的密钥列表。如果您注意到for (const file in listOfKeys),如果有一个大数据集,即如果listOfKeys有很长的键列表,这会同步运行导致 lambda 超时。问题是,有没有办法异步或并行运行循环,以便及时或异步压缩文件?

代码:

const AWS = require('aws-sdk');
const async = require('async');
const archiver = require('archiver');
const stream = require('stream');
const request = require('request');

const awsOptions = {
    region: 'us-east-1'
};
const s3 = new AWS.S3(awsOptions);

const streamTo = (bucket, key) => {
    var passthrough = new stream.PassThrough();
    s3.upload({
        Bucket: bucket,
        Key: key,
        Body: passthrough,
        ContentType: "application/zip",
    },
        (err, data) => {
            if (err) throw err;
        }
    );
    return passthrough;
};

const getStream = (bucket, key) => {
    let streamCreated = false;
    const passThroughStream = new stream.PassThrough();

    passThroughStream.on("newListener", event => {
        if (!streamCreated && event == "data") {
            const s3Stream = s3
                .getObject({ Bucket: bucket, Key: key })
                .createReadStream();
            s3Stream
                .on("error", err => passThroughStream.emit("error", err))
                .pipe(passThroughStream);

            streamCreated = true;
        }
    });
    return passThroughStream;
};

exports.handler = async (event, context, callback) => {

    let totalKeys = 0;
    const listOfKeys = [];
    const SrcBucket = event.Records[0].s3.bucket.name;
    const trigger_file = event.Records[0].s3.object.key;
    const prefix = trigger_file.split('/')[0] + '/' + trigger_file.split('/')[1] + '/';
    const dirToZip = trigger_file.split('/')[2].substr(0, trigger_file.split('/')[2].length - '.renamed'.length);
    const s3ListFilter = prefix + dirToZip;
    const destinationKey = prefix + `${dirToZip}.zip`;
    const bucketParams = {
        Bucket: SrcBucket,
        Delimiter: '/',
        Prefix: s3ListFilter + '/'
    };

    let data;
    do {
        bucketParams.Marker = (data && data.NextMarker) ? data.NextMarker : undefined;
        data = await s3.listObjects(bucketParams).promise();
        const contents = data.Contents;
        totalKeys = totalKeys + contents.length;
        listOfKeys.push(...contents.map(x => x.Key));
    } while (data.IsTruncated);

    console.log(`Total keys: ${listOfKeys.length}`);
    
    await new Promise(async (resolve, reject) => {
        var zipStream = streamTo(SrcBucket, destinationKey);
        zipStream.on("close", resolve);
        zipStream.on("end", resolve);
        zipStream.on("error", reject);
        var archive = archiver("zip");
        archive.on("error", err => {
            throw new Error(err);
        });
        archive.pipe(zipStream);

        var keysCounter = 0;
        listOfKeys.forEach(file => {
            archive.append(getStream(SrcBucket, file), { name: file.split('/')[3] })
            keysCounter++
            if (keysCounter >= Object.keys(listOfKeys).length) {
                // Called at the end of the loop
                archive.finalize();
            }
        });

        //archive.finalize();
    }).catch(err => {
        throw new Error(err);
    });

    callback(null, {
        body: { final_destination: destinationKey }
    });
};

标签: node.jsasynchronousasync-awaitnode-async

解决方案


我可能会更积极地重写整个事情,但要回答你的具体问题:listOfKeys.forEach用这个替换你的陈述:

await Promise
  .all(
    listOfKeys.map(key => archive.append(getStream(SrcBucket, key), { name: key.split('/')[3] }))
  );

推荐阅读