首页 > 解决方案 > 当多台服务器访问数据库时,如何使用 mongodb 只允许一个条目?

问题描述

我有多个“工作”服务器处理作业并访问同一个 MongoDB 数据库,但我只希望创建一条消息,并且绝不允许运行相同作业的两台服务器创建相同的消息。

发送消息时,其status字段设置为“已发送”,或者如果已禁用,则将其设置为“已禁用”。因此,首先它会检查是否有任何已发送或已禁用的消息。然后它创建一个文档,其中一个lockedAt字段设置为当前时间,并检查同一消息是否已被锁定。我使用该lockedAt字段的原因是,如果作业由于某种原因失败,它将允许锁过期并再次运行。

这似乎在大多数情况下都有效,但是如果两个“工人”在几毫秒内运行相同的工作,就会有一些消息通过,所以我的逻辑并不完美,但我无法弄清楚重复消息是如何被创建的。

如何使用 MongoDB 来防止同一作业同时运行并创建两次相同的消息?

// Check if there is a locked message.
// insert a new message or update if one is found but return old message (or nothing if one didn't' exist)
const messageQuery = {
    listingID: listing._id,
    messageRuleID: messageRule._id,
    reservationID: reservation._id
};

let message = await Message.findOne({
    ...messageQuery,
    ...{status: {$in: ["disabled", "sent"]}}
});
// If message has been sent or is disabled don't continue
if (message) {
    return;
}

message = await Message.findOneAndUpdate(
    messageQuery,
    {
        listingID: listing._id,
        messageRuleID: messageRule._id,
        reservationID: reservation._id,
        lockedAt: moment().toDate() // Check out the message with the current date
    },
    {upsert: true}
);
// If no message is defined, then it's new and not locked, move on.
if (message) {
    // If message has been locked for less than X minutes don't continue
    const cutoff = moment().subtract(
        Config.messageSendLock.amount,
        Config.messageSendLock.unit
    );
    if (message.lockedAt && moment(message.lockedAt).isAfter(cutoff)) {
        // Put the last lock time back
        await Message.findOneAndUpdate(messageQuery, {lockedAt: message.lockedAt});
        return;
    }
}

标签: javascriptmongodbmongoose

解决方案


它比这简单得多。在 Mongo中,写操作在文档级别是原子的

您需要做的就是为您的消息定义一个唯一键。

假设关键是:

const messageId = {
    listingID: listing._id,
    messageRuleID: messageRule._id,
    reservationID: reservation._id
};

那么你只需要2个写查询。第一个是您的 upsert 的“插入”一半:

try {
    await Message.create(messageId);
} catch (err) {
    if (err.code !== '11000') {  // ignore duplicate key error
        throw err
    }
}

这是幂等的,可以由多个worker同时执行。结果始终相同 - 集合中恰好有 1 个文档。

该索引可以在模式级别定义如下:

Message.index({
        listingID: 1,
        messageRuleID: 1,
        reservationID: 1
}, { unique: true })

重要提示:确保您不仅在模型上定义唯一索引,而且该索引已应用于数据库端的集合。Mongoose 有一些优化,并不总是应用索引。否则上面的代码将创建重复项。来自猫鼬文档的相关部分:

当您的应用程序启动时,Mongoose 会自动为您的模式中定义的每个索引调用 createIndex。Mongoose 将为每个索引依次调用 createIndex,并在所有 createIndex 调用成功或出现错误时在模型上发出 'index' 事件。虽然很适合开发,但建议在生产中禁用此行为,因为创建索引会导致显着的性能影响。通过将架构的 autoIndex 选项设置为 false 来禁用该行为,或者通过将选项 autoIndex 设置为 false 在连接上全局禁用该行为。

第二个查询是您的 upsert 的“更新”部分:

const cutoff = moment().subtract(
    Config.messageSendLock.amount,
    Config.messageSendLock.unit
);
    

message = await Message.findOneAndUpdate(
    {
        ...messageId,
        status: {$nin: ["disabled", "sent"]},
        $or: [
            { lockedAt: {$exists: false} }, 
            { lockedAt: {$lte: cutoff}}
        ]
    },
    { lockedAt: moment().toDate() }
);

// If no message is defined, then it's either sent, disabled, or is locked
if (!message) {
    return
}

同时从多个工作人员运行此查询将导致数据库端匹配文档上的IX 锁队列。第一个将添加/更新lockedAt字段并将文档返回给您的工作人员。所有连续查询都不会匹配过滤器并且不返回任何内容。


推荐阅读