首页 > 解决方案 > 如何从生产者 lambda 到消费者 lambda 获取 SQS 批次?

问题描述

生产者 AWS lambda 使用 sqs.sendMessageBatch() 将一批例如 3 条消息推送到 AWS SQS 标准队列中。然而,对于原始 SQS 批处理的每个条目,消费者 AWS lambda 会被调用 3 次。

如何从生产者批次获取 SQS 批次到其实体中的消费者批次?

sqs.sendMessageBatch()

整个消息批处理触发 lambda。SQS 消费者 lambda 的事件接收整个输入批次。

我已将消费者 lambda 的批处理大小设置为默认值 10。生产者 lambda 发送包含 3 条消息(条目)的批处理。我观察到消费者 lambda get 为批处理中的每条消息调用了 3 次:

// PRODUCER
async function sendMessageBatch(params) {
    return new Promise(function (res, rej) {
        sqs.sendMessageBatch(params, function (err, data) {
            if (err) {
                rej(err);
            } else {
                res(data);
            }
        });
    });
}


exports.handler = async (event) => {
    .
    .
    .
    .
    var transaction;
    var entries = [];
    var entry;
    var transactions = await scan(sparams);
    // e.g. 3 messages
    for (var i = 0; i < transactions.length; i++) {
        transaction = transactions[i];
        console.log("TX: " + JSON.stringify(transaction));
        var msgBody = JSON.stringify({
            asset: transaction.asset,
            lastUpdatedAt: transaction.lastUpdatedAt,
            to: transaction.to,
            qty: transaction.qty,

        });
        entry = {
            Id: uuidv1(),
            MessageBody: msgBody,
            DelaySeconds: 0
        };
        entries.push(entry);
    }

    if (entries.length) {
        console.log("Entries: " + JSON.stringify(entries));
        var sendMessageBatchParams = {
            Entries: entries,
            QueueUrl: QUEUE_URL
        };
        await sendMessageBatch(sendMessageBatchParams);
    }
    return {};
}

//CONSUMER
exports.handler = async (event) => {
    // expected: 3 messages in event.Records (batch)
    for (var record of event.Records) {
        .
        .
        .
    }
}

标签: amazon-web-servicesaws-lambda

解决方案


serverless blog 上有一篇非常棒的文章,关于 Lambda 和 SQS 集成,其中有一个专门讨论批处理的部分。

确保正确设置了 batchSize(要处理的最大消息数)。如果一次交付很重要,请确保您的队列是 FIFO 队列,否则请确保通过确保处理是幂等的,注意规划弹性的第二点。


推荐阅读