首页 > 解决方案 > DynamoDb 条件插入

问题描述

我在我的应用程序中使用 C# 和 DynamoDb。我的设计假设只写只读。强烈禁止更新项目。仅插入新项目或读取现有项目。
假设我有付款项目的状态

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:00:00#Approved" //SK
}

现在 2 个并发请求来自不同的客户端:第一个尝试Cancel付款,第二个尝试Settle(确认)付款。

2个插入是:

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:01:00#Cancel" //SK
}

{
  "PaymentInvoice":"001", //PK
  "Status":"2019-07-10T00:01:00#Settle" //SK
}

所以这是比赛条件。
- 如果已取消
,您将无法结算 - 如果已结算,您将无法取消付款

明显的解决方案是:
1)创建事务
2)先进行查询,然后检查是否可以根据业务规则
插入3)插入新项目

所以问题是:1)是否可以锁定整个分区以防止从其他客户端插入新项目?2)是否有任何内置选项,如条件更新,但用于插入项目

标签: c#amazon-dynamodb

解决方案


在我开始之前快速澄清一点。我假设一个分布式的、面向服务的架构,并且我假设对这个 DynamoDB 表的所有读取和写入都只通过一个服务发生。我将使用“应用程序”来指代您正在构建的访问表的软件,并使用“客户端”来指代任何属于您的应用程序客户端的东西。


是否有任何内置选项,如条件更新,但用于插入项目?

简短的回答是“是”,使用基于版本号的乐观锁定

您需要首先将排序键更改为顺序事件编号。这将是用于对您的项目进行版本控制的属性,它使用条件更新,并且在解决您的问题的任何解决方案中产生的额外开销最少。

让我们首先查看建议模式的一些示例数据。我冒昧地添加了更多的状态类型。

invoiceId | eventNo | eventStatus |      datetime
----------|---------|-------------|---------------------
      111 |       0 | created     | 2019-07-11T00:01:00
      111 |       1 | approved    | 2019-07-11T00:02:00
      111 |       2 | modified    | 2019-07-12T00:03:00
      111 |       3 | approved    | 2019-07-12T00:04:00
      111 |       4 | settled     | 2019-07-13T00:05:00

乐观锁定的一般思想是您读取当前状态,然后通过插入带有增量eventNo(相当于version在 AWS 文档中)的新记录来更新状态,条件eventNo是 尚未存在invoiceId。之所以可行,是因为当您读取现有状态时,您总是知道下一个状态eventNo应该是什么(与使用时间戳作为排序键不同)。

为了使这一点更具体,在2019-07-13客户端发送结算发票的请求时,您的应用程序读取最新状态,看到状态eventNo为 3 并且status“已批准”,因此它向 DynamoDB 提交了一个UpdateItem请求(翻译为简单的英语)说

仅当不存在其他状态更新时才使用invoiceId=111and插入状态更新eventNo=4invoiceId=111eventNo=4

如果两个客户端同时尝试更新状态,则只有一个 UpdateItem 请求会成功,另一个会返回ConditionalCheckFailedException

好的,那么我该如何编码呢?

我已经十多年没有使用 C#,所以请原谅可能存在的任何语法或格式错误。

AmazonDynamoDBClient client = new AmazonDynamoDBClient();

// These should be method parameters/args, but I'm directly assigning them to 
// keep this code sample simple.
var invoiceToUpdate = 123;
var invoiceNewState = "settled";

// Here's the useful part of the sample code

// First we make a query to get the current state
var queryRequest = new QueryRequest
{
    TableName = "Invoices",
    KeyConditionExpression = "invoiceId = :inv",
    ExpressionAttributeValues = new Dictionary<string, AttributeValue> {
        {":inv", new AttributeValue {N = invoiceIdToUpdate.toString() }}
    },

    // This assumes we only need to check the current state and not any of the historical
    // state, so we'll limit the query to return only one result.
    Limit = 1,

    // If we're limiting it to only one result, change the sort order to make sure we get
    // the result with the largest eventNo (and therefore the most recent state).
    ScanIndexForward = false,

    // This is not strictly necessary for correctness because of the condition expression
    // in the PutItem request, but including it will help reduce the likelihood of getting
    // a ConditionalCheckFailedException later on.
    ConsistentRead = true
};

var queryResponse = client.Query(queryRequest);

// Check to see if there is any previous record for this invoice
// Setup the default values if the query returned no results
int newEventNo = 0;
string invoiceCurrentState = null;
if (queryResponse.Items.Count > 0) {{
    // If there is any existing record, then increment the eventNo for the new record
    var latestRecord = queryResponse.QueryResult().Items[0];
    newEventNo = Convert.ToInt32(latestRecord["eventNo"]) + 1;
    invoiceCurrentState = latestRecord["eventStatus"];
}

var isValidChange = MyBusinessLogic.isValidChange(invoiceCurrentState, invoiceNewState);

if (isValidChange) {
    var putItemRequest = new PutItemRequest
    {
        TableName = "Invoices",
        Item = new Dictionary<string,AttributeValue>() { 
            { "invoiceId", new AttributeValue {N = invoiceIdToUpdate.toString() }},
            { "eventNo", new AttributeValue {N = newEventNo.toString()}},
            { "eventStatus", new AttributeValue {S = invoiceNewState}},
            { "datetime", new AttributeValue {S = DateTime.UtcNow.ToString("yyyy-MM-ddTHH:mm:ssZ") }}
        },

        // Every item must have the key attributes, so using 'attribute_not_exists'
        // on a key attribute is functionally equivalent to an "item_not_exists" 
        // condition, causing the PUT to fail if it would overwrite anything at all.
        ConditionExpression = "attribute_not_exists('invoiceId')"
    };

    try {
        var putItemResponse = client.PutItem(putItemRequest);

    } catch (ConditionalCheckFailedException ex) {
        // How you handle this is up to you. I recommend choosing one of these options: 
        // (1) Throw an exception with a more useful message explaining that the state changed while the
        //     request was being processed
        // (2) Automatically try again, starting with the query and including the business validations,
        //     and if the state change is still valid, submit a new PutItem request with the new eventNo.
    }

    // Return an acknowledgement to the client

} else {
    throw new System.InvalidOperationException("Update is not valid for the current status of the invoice.");
}

这是我给出的代码示例的一些相关文档。


是否可以锁定整个分区以防止从其他客户端插入新项目?

是的,但它不是在数据库中实现的锁。锁定必须在您的应用程序中使用会产生额外开销的单独锁定库进行,因此除非您没有其他选择,否则不应使用此方法。对于无法更改表架构的任何阅读问题的人,您可以将您的应用程序设置为使用DynamoDB 锁定客户端锁定分区键,读取当前状态,执行写入(如果允许),然后释放锁。


推荐阅读