javascript - 当多台服务器访问数据库时,如何使用 mongodb 只允许一个条目?
问题描述
我有多个“工作”服务器处理作业并访问同一个 MongoDB 数据库,但我只希望创建一条消息,并且绝不允许运行相同作业的两台服务器创建相同的消息。
发送消息时,其status
字段设置为“已发送”,或者如果已禁用,则将其设置为“已禁用”。因此,首先它会检查是否有任何已发送或已禁用的消息。然后它创建一个文档,其中一个lockedAt
字段设置为当前时间,并检查同一消息是否已被锁定。我使用该lockedAt
字段的原因是,如果作业由于某种原因失败,它将允许锁过期并再次运行。
这似乎在大多数情况下都有效,但是如果两个“工人”在几毫秒内运行相同的工作,就会有一些消息通过,所以我的逻辑并不完美,但我无法弄清楚重复消息是如何被创建的。
如何使用 MongoDB 来防止同一作业同时运行并创建两次相同的消息?
// Check if there is a locked message.
// insert a new message or update if one is found but return old message (or nothing if one didn't' exist)
const messageQuery = {
listingID: listing._id,
messageRuleID: messageRule._id,
reservationID: reservation._id
};
let message = await Message.findOne({
...messageQuery,
...{status: {$in: ["disabled", "sent"]}}
});
// If message has been sent or is disabled don't continue
if (message) {
return;
}
message = await Message.findOneAndUpdate(
messageQuery,
{
listingID: listing._id,
messageRuleID: messageRule._id,
reservationID: reservation._id,
lockedAt: moment().toDate() // Check out the message with the current date
},
{upsert: true}
);
// If no message is defined, then it's new and not locked, move on.
if (message) {
// If message has been locked for less than X minutes don't continue
const cutoff = moment().subtract(
Config.messageSendLock.amount,
Config.messageSendLock.unit
);
if (message.lockedAt && moment(message.lockedAt).isAfter(cutoff)) {
// Put the last lock time back
await Message.findOneAndUpdate(messageQuery, {lockedAt: message.lockedAt});
return;
}
}
解决方案
它比这简单得多。在 Mongo中,写操作在文档级别是原子的。
您需要做的就是为您的消息定义一个唯一键。
假设关键是:
const messageId = {
listingID: listing._id,
messageRuleID: messageRule._id,
reservationID: reservation._id
};
那么你只需要2个写查询。第一个是您的 upsert 的“插入”一半:
try {
await Message.create(messageId);
} catch (err) {
if (err.code !== '11000') { // ignore duplicate key error
throw err
}
}
这是幂等的,可以由多个worker同时执行。结果始终相同 - 集合中恰好有 1 个文档。
该索引可以在模式级别定义如下:
Message.index({
listingID: 1,
messageRuleID: 1,
reservationID: 1
}, { unique: true })
重要提示:确保您不仅在模型上定义唯一索引,而且该索引已应用于数据库端的集合。Mongoose 有一些优化,并不总是应用索引。否则上面的代码将创建重复项。来自猫鼬文档的相关部分:
当您的应用程序启动时,Mongoose 会自动为您的模式中定义的每个索引调用 createIndex。Mongoose 将为每个索引依次调用 createIndex,并在所有 createIndex 调用成功或出现错误时在模型上发出 'index' 事件。虽然很适合开发,但建议在生产中禁用此行为,因为创建索引会导致显着的性能影响。通过将架构的 autoIndex 选项设置为 false 来禁用该行为,或者通过将选项 autoIndex 设置为 false 在连接上全局禁用该行为。
第二个查询是您的 upsert 的“更新”部分:
const cutoff = moment().subtract(
Config.messageSendLock.amount,
Config.messageSendLock.unit
);
message = await Message.findOneAndUpdate(
{
...messageId,
status: {$nin: ["disabled", "sent"]},
$or: [
{ lockedAt: {$exists: false} },
{ lockedAt: {$lte: cutoff}}
]
},
{ lockedAt: moment().toDate() }
);
// If no message is defined, then it's either sent, disabled, or is locked
if (!message) {
return
}
同时从多个工作人员运行此查询将导致数据库端匹配文档上的IX 锁队列。第一个将添加/更新lockedAt
字段并将文档返回给您的工作人员。所有连续查询都不会匹配过滤器并且不返回任何内容。
推荐阅读
- sql - 按状态分组的社区计数
- wpf - 我需要
.config 文件以在 WPF 中正常运行 - winapi - Windows 10 API 例程的汇编程序调用约定
- python - 如何从 Pandas DF 创建(正确)一个 NumPy 数组
- sql - 根据条件删除/修改一个表中的行 - Oracle DBMS
- go - 为什么我的 Google Cloud Run 服务器返回 CORS 错误?
- c# - 遍历大型 2 列列表的最佳方法
- lisp - 在 Lisp 中将 hashtable 中的每个值重置为 nil
- python - 熊猫爆炸错误 - 列必须是标量
- c# - C# Lambda / 扩展方法使 4 个实体之间的结果变平?