amazon-web-services - AWS Lambda 未处理正确数量的 DynamoDB 记录
问题描述
我正在处理通过 Lambda 写入 Kinesis 的事件并将其推送到 DynamoDB。我注意到一些奇怪的问题,插入到 DynamoDB 的记录从来都不一样。
这是我的 Lambda 函数。
const AWS = require("aws-sdk");
const dynamodb = new AWS.DynamoDB({
region: 'us-east-1',
apiVersion: '2012-08-10',
httpOptions: {
connectTimeout: 1000,
timeout: 1000
},
});
const tablename = "events";
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
exports.handler = async (event, context) => {
let params = {
RequestItems: {
[tablename] : []
}
};
console.log("Batch size is: " + event.Records.length);
for (let i=0; i<event.Records.length; i++) {
let record = event.Records[i];
// Kinesis data is base64 encoded so decode here
let payload = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString('utf8'));
params.RequestItems[tablename].push({
PutRequest: {
Item: {
"email": { "S": payload.email },
"activity_id_and_state": { "S": payload.activity_id_and_state },
"name": { "S": payload.name }
}
}
});
if (params.RequestItems[tablename].length == 25) { // dynamodb batch write size
console.log("Pushing full 25 items");
await deliver_records(params);
params.RequestItems[tablename] = [];
}
}
if (params.RequestItems[tablename].length > 0) { // Push pending items
console.log("Pushing : " + params.RequestItems[tablename].length + " items." );
await deliver_records(params);
}
};
async function deliver_records(records, retryCount = 0) {
records.RequestItems[tablename].forEach( record => {
console.log(record.PutRequest.Item.email);
});
if (retryCount > 0){
console.log("Retrying " + retryCount + " count is: " + records.RequestItems[tablename].length);
}
await dynamodb.batchWriteItem(records, async function(err, data) {
if (err) {
console.log("Errors", err);
} else {
if (data.UnprocessedItems[tablename] && data.UnprocessedItems[tablename].length > 0){
console.log("Processing unprocessed messages.. " + data.UnprocessedItems[tablename].length);
if (retryCount > 3){
console.log("Exceeded max retries.. sorry");
throw new Error(data.UnprocessedItems[tablename]);
}
await sleep(2 ** retryCount * 10);
records.RequestItems = data.UnprocessedItems;
return await deliver_records(records, retryCount++);
}
}
}).promise();
}
我没有看到任何超时和任何类型的异常。我看到的一些问题是:
- 我插入了 100 条记录,它们很好。删除 dynamoDB 中的所有记录并插入新的 150 条记录。现在除了 150 条记录之外,在 dynamoDB 中还可以看到一些旧记录。虽然 cloudwatch 日志显示 lambda 只处理了 150 条记录。
- 大多数情况下,总是有一些记录被遗漏并没有被处理并插入到 lambda 中。日志中没有异常或超时。
配置。Lambda 超时时间为 10s,dynamoDB 配置了 5 个 RCU 和 500 个 WCU。
任何指针都会有所帮助。谢谢
解决方案
推荐阅读
- vba - 单击时停止播放声音媒体
- css - 我如何在css中悬停运行另一个类
- c# - 有没有办法将我的 Angular ASP.net Core 模板转换为 ASP.net core MVC
- javascript - 使用 React 元素的卡片上的 prop.style 键“0”上的道具类型消息失败
- angular - 从 API 获取多个数据需要时间在 Angular 7 中加载
- c++ - 为什么函数返回后不调用析构函数?
- python - 获取html的所有内容
- r - 如何为共享“customerID”和“spouseID”的行创建“householdID”?
- javascript - 打字稿错误:类型“数字”不可分配给类型“从不”
- parsing - epsilon 的 LR 解析器