node.js - how to lock reading in mongodb with nodejs
问题描述
prehistory
I have a nodejs server which instances are running on multiple machines and every instance runs cron job once per day at the same time.
While one instance is running or has just finished its job, another instances should skip executing logic inside of jobs.
I've already had mongodb connection so I decided to save state of runned job and its time to DB and check/change it inside of every job's callback. The document model I chose to save job state in collection is:
interface JobDao {
_id: ObjectId;
type: string;
state: 'locked' | 'failed' | 'completed';
updatedDate: DateISO;
}
I use package "mongodb": "^3.6.3"
to make queries.
After some tries, I wonder if I can implement described bellow behavior or not. Also maybe somebody can suggest another solution to sync running jobs for multiple machines.
So solution I try to implement and I ask for help with:
- When cron starts, get job from DB.
- check state of job with such conditions:
- if it's locked and not expired -> skip logic (note: I use one hour expiration to prevent some unexpected issues when server was broken while running)
- if it's locked and expired -> change state of job to locked
- if it's not locked but was updated till last 5 minutes -> change state of job to locked
- execute logic due to condition above
- "unlock" job (update job's state in document)
But here's the issue I met. As there's no concurrency. Between getting and updating the document in one machine, another machines can get or update stale document with not relevant data.
I've tried such solutions as:
- findOneAndUpdate
- tried to add aggregation (here's the proplem to compare the exipration and it looks to be impossible).
- trtransactions
- bulk
But nothing had worked. I start thinking to change DB but maybe somebody can say how to implement it with mongodb or can recomend some more suitable solution?
解决方案
After a small rest I decided to start from the scratch and I finally found a solution. Here's my code example. It's not perfect, so I'm planning to refactore it, hovever, it works and solves my issue! Hope it'll help sb.
Small description of its logic
The "mediator" is the public method scheduleJob
. Logic order:
- when we schedule job, it creates new document for the type in DB if it doesn't exist.
- unlocks job if it's stale (it's stale if has been locked more than for a half an hour). Server can fall down while running job what cause infinite lock of job but checking stale job should help
- next step is locking unlocked job, othervise, finish the logic. It's possible when one instance finishes job just before next instance starts, so I added finishing of the job if the same job was running for last 5 minutes. It's important that such condition restricts frequency as jobs can't bet runned every 5 minutes but in my case it's suitable solution
CronJobDao
and CronJobModel
are the same and represent the document in DB:
export interface CronJobDao {
type: CronJobTypeEnum;
isLocked: boolean;
updatedAt: Date;
completedAt: Date;
}
Service with scheduleJob
method:
import { inject, injectable } from 'inversify';
import { Job, scheduleJob } from 'node-schedule';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobRepository } from './cron-job.repository';
@injectable()
export class CronJobService {
readonly halfHourMs = 30 * 60 * 1000;
readonly fiveMinutesMs = 5 * 60 * 1000;
constructor(
@inject(CronJobRepository) private cronJobRepository: CronJobRepository,
) {}
scheduleJob(type: CronJobTypeEnum, timeRule: string, callback: Function): Job {
this.cronJobRepository.registerJob(type).then();
return scheduleJob(
type,
timeRule,
async () => {
await this.unlockStaleJob(type);
const lockedJob = await this.cronJobRepository.lockJob(type);
if (!lockedJob) {
console.warn('Job has already been locked');
return;
}
if ((new Date().getTime() - lockedJob.completedAt?.getTime()) < this.fiveMinutesMs) {
await this.cronJobRepository.unlockJob(type);
console.warn('Job has recently been completed');
return;
}
console.info('Job is locked');
callback();
await this.cronJobRepository.completeJob(type);
console.info('Job is completed');
},
);
}
private async unlockStaleJob(type: CronJobTypeEnum): Promise<void> {
const staleJob = await this.cronJobRepository.unlockIfTimeExpired(type, this.halfHourMs);
if (!staleJob) {
return;
}
console.warn('Has stale job: ', JSON.stringify(staleJob));
}
}
Class for communication with DB:
import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { CronJobDao, mapCronJobDaoToModel } from '../core/daos/cron-job.dao';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobModel } from '../core/models/cron-job.model';
import { AbstractRepository } from '../core/utils/abstract.repository';
@injectable()
export class CronJobRepository extends AbstractRepository<CronJobDao> {
constructor(@inject(Db) db: Db) {
super(db, 'cron_jobs');
}
async registerJob(type: CronJobTypeEnum) {
const result = await this.collection.findOneAndUpdate(
{ type },
{
$setOnInsert: {
type,
isLocked: false,
updatedAt: new Date(),
},
},
{ upsert: true, returnOriginal: false },
);
return result.value;
}
async unlockIfTimeExpired(type: CronJobTypeEnum, expiredFromMs: number): Promise<CronJobModel | null> {
const expirationDate = new Date(new Date().getTime() - expiredFromMs);
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
updatedAt: { $lt: expirationDate },
},
{
$set: {
updatedAt: new Date(),
isLocked: false,
},
});
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async lockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, false);
}
async unlockJob(type: CronJobTypeEnum) {
return this.toggleJobLock(type, true);
}
private async toggleJobLock(type: CronJobTypeEnum, stateForToggle: boolean): Promise<CronJobModel | null> {
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: stateForToggle,
},
{
$set: {
isLocked: !stateForToggle,
updatedAt: new Date(),
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
async completeJob(type: CronJobTypeEnum): Promise<CronJobModel | null> {
const currentDate = new Date();
const result = await this.collection.findOneAndUpdate(
{
type,
isLocked: true,
},
{
$set: {
isLocked: false,
updatedAt: currentDate,
completedAt: currentDate,
},
},
);
return result.value ? mapCronJobDaoToModel(result.value) : null;
}
}
推荐阅读
- flutter - 如何在颤动中模糊未选择的项目
- django - 应用 [web.1]:未找到:/static/js/index.js
- python - 我如何摆脱'Tkinter.Tcl 错误-无法识别图像文件中的数据'?
- css - CSS 中的实用程序类是什么?初级 CSS 开发人员
- sql-server - SSIS 错误对象“database_principals”、数据库“mssqlsystemresource”、架构“sys”的 SELECT 权限被拒绝。(错误:229)
- exception - 在 ARMv8 上的 EL1 中写入 RAM
- c# - 将 ASP.NET Web 应用程序迁移到 Mac Rider
- python - 复制到预分配列表比追加慢,这似乎违反直觉
- python-3.x - TypeError: __init__() 在带有 keras 的 SeqGAN 中为参数“g_lr”获得了多个值
- bash - 循环遍历一组随机文件