multithreading - 更新指向最早待处理工作日志的指针的高效算法
问题描述
有一个包含记录的 DynamoDB 表。该表有 1 行专门用于跟踪nextPending
消息。分区键字段具有值nextPending
,并且具有messageId
. 表的所有其他行都包含消息。每条记录都有一个messageId
唯一且没有间隙且不递减的记录。此时此表中有超过 1M 条记录。记录表明一件作品。服务使用此队列并一一处理每条消息。在完全处理完消息后,它会做两件事。首先,它将记录的状态字段设置为终止状态。接下来,它使用= previous更新nextPending
记录。 messageId
messageId + 1
现在,我们正在尝试使该服务成为多线程的。我们将有多个线程并行处理消息,而不是一次处理一条消息,并且这些消息可以以随机顺序完成。
我正在寻找一种有效而优雅的算法来nextPending
适当地更新字段。想象一下 nextPending 当前的值为 101。服务中的各个线程正在处理 101 到 110 之间的消息。比如说,它们按以下顺序完成:109、105、104、108、103、102、114、101,...。在我们看到 101 完成后,我们需要将 nextPending 更新为 105。在 101 完成之前,我们无法更新nextPending
,因为 101 可能会失败并且需要重试,并且只要没有完成,就nextPending
应该始终指向最早的待处理消息。
一种算法可能是:
在完全处理完消息后,每个线程做两件事。首先,它将记录的状态字段设置为终止状态。接下来,它使用此时待处理nextPending
的最早的记录来更新记录messageId
。但是,此解决方案要求每个线程从 DynamoDB 读取多条记录并检查消息的状态。此外,几个线程现在将竞争有条件地更新表中的这一行。这也不是理想的事情。
另一种算法可能是:每个线程共享一个共同的滚动窗口,该窗口跟踪所有已完成的消息。消息完成后,一个专用线程会检查它是否为messageId
= nextPending + 1
。如果是,我们更新nextPending
为当前已完成消息序列中的最大数。在这种方法中,我们不会不必要地从 DynamoDB 中读取数据,也不会有多个线程相互竞争来完成相同的工作单元。
这里有更好的想法吗?
解决方案
推荐阅读
- ionic3 - 离子本地存储不会第一次加载
- javascript - FOSJSRoutingBundle - Routing.generate 不做任何事情
- sql - Doctrine - 如何在扩展实体的主键中添加字段?
- controller - 将 Swagger 与具有相同 URL 但响应类型不同的多个控制器一起使用
- php - 回答所有问题,除了那些有音频或视频的 Laravel Eloquent
- apache-kafka - Apache Kafka:减少 kafka 磁盘使用量
- flutter - 如何通过 Column/Row 容器中的左(或右)边缘提供项目的对齐方式?
- angular6 - @ngrx/store-devtools - 效果调用了两次
- sql - 统计 SQL Server 中每个月的行数
- java - 对象模拟不起作用(其为空)并在方法调用时返回 NullPointerException