首页 > 解决方案 > AWS Lambda 未处理正确数量的 DynamoDB 记录

问题描述

我正在处理通过 Lambda 写入 Kinesis 的事件并将其推送到 DynamoDB。我注意到一些奇怪的问题,插入到 DynamoDB 的记录从来都不一样。

这是我的 Lambda 函数。

const AWS = require("aws-sdk");
const dynamodb = new AWS.DynamoDB({
    region: 'us-east-1',
    apiVersion: '2012-08-10',
    httpOptions: { 
        connectTimeout: 1000, 
        timeout: 1000 
    },
});

const tablename = "events";

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}

exports.handler = async (event, context) => {
    
    let params = {
        RequestItems: {
            [tablename] : []
        }
    };
    
    console.log("Batch size is: " + event.Records.length);
    
    for (let i=0; i<event.Records.length; i++) {
        
        let record = event.Records[i];
        
        // Kinesis data is base64 encoded so decode here
        let payload = JSON.parse(Buffer.from(record.kinesis.data, 'base64').toString('utf8'));
        
        params.RequestItems[tablename].push({
            PutRequest: {
                Item: {
                    "email": { "S": payload.email },
                    "activity_id_and_state": { "S": payload.activity_id_and_state },
                    "name": { "S": payload.name }
                }
            }
        });
        
        if (params.RequestItems[tablename].length == 25) { // dynamodb batch write size
            console.log("Pushing full 25 items");
            await deliver_records(params);
            params.RequestItems[tablename] = [];
        }
        
    }

    if (params.RequestItems[tablename].length > 0) { // Push pending items
        console.log("Pushing : " + params.RequestItems[tablename].length + " items." );
        await deliver_records(params);
    }

};

async function deliver_records(records, retryCount = 0) {
    
    records.RequestItems[tablename].forEach( record => {
        console.log(record.PutRequest.Item.email);
    });
    
    if (retryCount > 0){
        console.log("Retrying " + retryCount + " count is: " + records.RequestItems[tablename].length);
    }
    
    await dynamodb.batchWriteItem(records, async function(err, data) {
        if (err) {
            console.log("Errors", err);
        } else { 
            if (data.UnprocessedItems[tablename] && data.UnprocessedItems[tablename].length > 0){
                console.log("Processing unprocessed messages.. " + data.UnprocessedItems[tablename].length);
                if (retryCount > 3){
                    console.log("Exceeded max retries.. sorry");
                    throw new Error(data.UnprocessedItems[tablename]);
                }
                await sleep(2 ** retryCount * 10);
                records.RequestItems = data.UnprocessedItems;
                return await deliver_records(records, retryCount++);
            }
        }
    }).promise();
}

我没有看到任何超时和任何类型的异常。我看到的一些问题是:

  1. 我插入了 100 条记录,它们很好。删除 dynamoDB 中的所有记录并插入新的 150 条记录。现在除了 150 条记录之外,在 dynamoDB 中还可以看到一些旧记录。虽然 cloudwatch 日志显示 lambda 只处理了 150 条记录。
  2. 大多数情况下,总是有一些记录被遗漏并没有被处理并插入到 lambda 中。日志中没有异常或超时。

配置。Lambda 超时时间为 10s,dynamoDB 配置了 5 个 RCU 和 500 个 WCU。

任何指针都会有所帮助。谢谢

标签: amazon-web-servicesaws-lambdaamazon-dynamodbamazon-kinesis

解决方案


推荐阅读