amazon-web-services - 如何从生产者 lambda 到消费者 lambda 获取 SQS 批次?
问题描述
生产者 AWS lambda 使用 sqs.sendMessageBatch() 将一批例如 3 条消息推送到 AWS SQS 标准队列中。然而,对于原始 SQS 批处理的每个条目,消费者 AWS lambda 会被调用 3 次。
如何从生产者批次获取 SQS 批次到其实体中的消费者批次?
sqs.sendMessageBatch()
整个消息批处理触发 lambda。SQS 消费者 lambda 的事件接收整个输入批次。
我已将消费者 lambda 的批处理大小设置为默认值 10。生产者 lambda 发送包含 3 条消息(条目)的批处理。我观察到消费者 lambda get 为批处理中的每条消息调用了 3 次:
// PRODUCER
async function sendMessageBatch(params) {
return new Promise(function (res, rej) {
sqs.sendMessageBatch(params, function (err, data) {
if (err) {
rej(err);
} else {
res(data);
}
});
});
}
exports.handler = async (event) => {
.
.
.
.
var transaction;
var entries = [];
var entry;
var transactions = await scan(sparams);
// e.g. 3 messages
for (var i = 0; i < transactions.length; i++) {
transaction = transactions[i];
console.log("TX: " + JSON.stringify(transaction));
var msgBody = JSON.stringify({
asset: transaction.asset,
lastUpdatedAt: transaction.lastUpdatedAt,
to: transaction.to,
qty: transaction.qty,
});
entry = {
Id: uuidv1(),
MessageBody: msgBody,
DelaySeconds: 0
};
entries.push(entry);
}
if (entries.length) {
console.log("Entries: " + JSON.stringify(entries));
var sendMessageBatchParams = {
Entries: entries,
QueueUrl: QUEUE_URL
};
await sendMessageBatch(sendMessageBatchParams);
}
return {};
}
//CONSUMER
exports.handler = async (event) => {
// expected: 3 messages in event.Records (batch)
for (var record of event.Records) {
.
.
.
}
}
解决方案
serverless blog 上有一篇非常棒的文章,关于 Lambda 和 SQS 集成,其中有一个专门讨论批处理的部分。
确保正确设置了 batchSize(要处理的最大消息数)。如果一次交付很重要,请确保您的队列是 FIFO 队列,否则请确保通过确保处理是幂等的,注意规划弹性的第二点。
推荐阅读
- c# - 即使我做的一切正确,显示按钮也不显示任何内容(我认为)
- php - CodeIgniter:未定义的变量从 __construct() 获取
- twitter-bootstrap - Bootstrap 按钮故障排除
- python-3.x - AttributeError: module 'datetime' has no attribute 'today' error while using Selenium GeckoDriver and Firefox through Python
- swift - Flutter Channel 方法字符串不能作为 URL 工作
- c++ - 无法读取 FAT32 隐藏扇区数
- .net - 获取不记名令牌 - Oauth 2
- python - 在 Python 中跳过不需要的日期时绘制时间与日期
- android - 如何使用 TypeScript 在 React Native 中编写正确的 `Keyboard.addListener`?
- c++ - 如何从 Terralang 扩展 C++ 类?