首页 > 解决方案 > how to lock reading in mongodb with nodejs

问题描述

prehistory

I have a nodejs server which instances are running on multiple machines and every instance runs cron job once per day at the same time.

While one instance is running or has just finished its job, another instances should skip executing logic inside of jobs.

I've already had mongodb connection so I decided to save state of runned job and its time to DB and check/change it inside of every job's callback. The document model I chose to save job state in collection is:

interface JobDao {
 _id: ObjectId;
 type: string;
 state: 'locked' | 'failed' | 'completed';
 updatedDate: DateISO;
}

I use package "mongodb": "^3.6.3" to make queries.

After some tries, I wonder if I can implement described bellow behavior or not. Also maybe somebody can suggest another solution to sync running jobs for multiple machines.

So solution I try to implement and I ask for help with:

But here's the issue I met. As there's no concurrency. Between getting and updating the document in one machine, another machines can get or update stale document with not relevant data.

I've tried such solutions as:

But nothing had worked. I start thinking to change DB but maybe somebody can say how to implement it with mongodb or can recomend some more suitable solution?

标签: node.jsmongodbconcurrencycron

解决方案


After a small rest I decided to start from the scratch and I finally found a solution. Here's my code example. It's not perfect, so I'm planning to refactore it, hovever, it works and solves my issue! Hope it'll help sb.

Small description of its logic

The "mediator" is the public method scheduleJob. Logic order:

  • when we schedule job, it creates new document for the type in DB if it doesn't exist.
  • unlocks job if it's stale (it's stale if has been locked more than for a half an hour). Server can fall down while running job what cause infinite lock of job but checking stale job should help
  • next step is locking unlocked job, othervise, finish the logic. It's possible when one instance finishes job just before next instance starts, so I added finishing of the job if the same job was running for last 5 minutes. It's important that such condition restricts frequency as jobs can't bet runned every 5 minutes but in my case it's suitable solution

CronJobDao and CronJobModel are the same and represent the document in DB:

export interface CronJobDao {
  type: CronJobTypeEnum;
  isLocked: boolean;
  updatedAt: Date;
  completedAt: Date;
}

Service with scheduleJob method:

import { inject, injectable } from 'inversify';
import { Job, scheduleJob } from 'node-schedule';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobRepository } from './cron-job.repository';


@injectable()
export class CronJobService {
  readonly halfHourMs = 30 * 60 * 1000;
  readonly fiveMinutesMs = 5 * 60 * 1000;

  constructor(
    @inject(CronJobRepository) private cronJobRepository: CronJobRepository,
  ) {}

  scheduleJob(type: CronJobTypeEnum, timeRule: string, callback: Function): Job {
    this.cronJobRepository.registerJob(type).then();

    return scheduleJob(
      type,
      timeRule,
      async () => {
        await this.unlockStaleJob(type);

        const lockedJob = await this.cronJobRepository.lockJob(type);

        if (!lockedJob) {
          console.warn('Job has already been locked');
          return;
        }

        if ((new Date().getTime() - lockedJob.completedAt?.getTime()) < this.fiveMinutesMs) {
          await this.cronJobRepository.unlockJob(type);
          console.warn('Job has recently been completed');
          return;
        }

        console.info('Job is locked');

        callback();

        await this.cronJobRepository.completeJob(type);
        console.info('Job is completed');
      },
    );
  }

  private async unlockStaleJob(type: CronJobTypeEnum): Promise<void> {
    const staleJob = await this.cronJobRepository.unlockIfTimeExpired(type, this.halfHourMs);

    if (!staleJob) {
      return;
    }

    console.warn('Has stale job: ', JSON.stringify(staleJob));
  }
}

Class for communication with DB:

import { inject, injectable } from 'inversify';
import { Db } from 'mongodb';
import { CronJobDao, mapCronJobDaoToModel } from '../core/daos/cron-job.dao';
import { CronJobTypeEnum } from '../core/enums/cron-job-type.enum';
import { CronJobModel } from '../core/models/cron-job.model';
import { AbstractRepository } from '../core/utils/abstract.repository';

@injectable()
export class CronJobRepository extends AbstractRepository<CronJobDao> {

  constructor(@inject(Db) db: Db) {
    super(db, 'cron_jobs');
  }

  async registerJob(type: CronJobTypeEnum) {
    const result = await this.collection.findOneAndUpdate(
      { type },
      {
        $setOnInsert: {
          type,
          isLocked: false,
          updatedAt: new Date(),
        },
      },
      { upsert: true, returnOriginal: false },
    );

    return result.value;
  }

  async unlockIfTimeExpired(type: CronJobTypeEnum, expiredFromMs: number): Promise<CronJobModel | null> {
    const expirationDate = new Date(new Date().getTime() - expiredFromMs);

    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: true,
        updatedAt: { $lt: expirationDate },
      },
      {
        $set: {
          updatedAt: new Date(),
          isLocked: false,
        },
      });

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }

  async lockJob(type: CronJobTypeEnum) {
    return this.toggleJobLock(type, false);
  }

  async unlockJob(type: CronJobTypeEnum) {
    return this.toggleJobLock(type, true);
  }

  private async toggleJobLock(type: CronJobTypeEnum, stateForToggle: boolean): Promise<CronJobModel | null> {
    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: stateForToggle,
      },
      {
        $set: {
          isLocked: !stateForToggle,
          updatedAt: new Date(),
        },
      },
    );

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }

  async completeJob(type: CronJobTypeEnum): Promise<CronJobModel | null> {
    const currentDate = new Date();

    const result = await this.collection.findOneAndUpdate(
      {
        type,
        isLocked: true,
      },
      {
        $set: {
          isLocked: false,
          updatedAt: currentDate,
          completedAt: currentDate,
        },
      },
    );

    return result.value ? mapCronJobDaoToModel(result.value) : null;
  }
}

推荐阅读