首页 > 解决方案 > ioRedis.Cluster 的玩笑有间歇性结果 - 未调用异步回调

问题描述

我正在尝试使用 docker 在集群模式下测试 ioredis 中的订阅。我在邮递员的实际测试中确实取得了成功,但是当我用 Jest 进行测试时,它变得断断续续,我进入Timeout - Async callback was not invoked within the 30000 ms timeout specified by jest.setTimeout.Timeout了控制台。

我的集群运行良好,这是我的 index.js

/* eslint-disable max-lines, no-unused-vars  */
const { App } = require('@MYCOMPANY/node-base');
const Redis = require('ioredis');
const Stream = require('stream');

const DEFAULT_SETTINGS = {
  cluster: false,
};

let instanceCache;

/**
  * @class
*/
class DatabaseCache {
  /**
   * @constructor
   * @param    {{
   *  host: String,
   *  port: Number,
   *  db: Number|String,
   *  TTL: Number,
   *  cluster: Boolean,
   *  nodes: Redis.ClusterNode[],
   *  options: Redis.ClusterOptions
   * }}  settings - Config cache
  */
  constructor(settings) {
    this.settings = {
      ...DEFAULT_SETTINGS,
      ...settings,
    };
  }

  /**
   * GetClient returns instance client
   * @return {Redis.Cluster|Redis.Redis} A Cache object.
   */
  getClient() {
    try {
      if (!this.instanceCache) {
        this.instanceCache = (this.settings.cluster)
          ? new Redis.Cluster(this.settings.nodes, this.settings.options)
          : new Redis(this.settings);

        this.on();

        return this.instanceCache;
      }

      return this.instanceCache;
    } catch (error) {
      throw new Error(error);
    }
  }

  /**
   * @private
   * On client events
 */
  on() {
    this.instanceCache.on('message', () => {
      App.Logger.current().debug({ msg: `redis.token.connect.success ${process.env.REDIS_PRODUCT_HOST}` });
    });

    this.instanceCache.on('error', (error) => {
      App.Logger.current().error({ error, msg: 'redis.token.connect.error' });
    });
  }

  /**
   * Select database
   * WARNING: This method is disable in cluster mode
   * @param {string|number} db - Database name or number
   * @return {void}
 */
  select(db) {
    return new Promise((resolve, reject) => {
      this.getClient().select(
        db,
        err => ((err) ? reject(err) : resolve()),
      );
    });
  }

  /**
   * Get by key
   * @param {Redis.KeyType} key - Key
   * @return {string} - Key value
 */
  get(key) {
    return new Promise((resolve, reject) => {
      this.getClient().get(
        key,
        (err, replay) => ((err) ? reject(err) : resolve(replay)),
      );
    });
  }

  /**
   * GetMany keys
   * WARNING: In cluster mode "mget" works by searching with keys inside a single slot
   * @param {Array.<Redis.KeyType>} keys - Key
   * @return {Array.<{key: Redis.KeyType, value: string}> - Key value
 */
  getMany(keys) {
    return new Promise((resolve, reject) => {
      try {
        const formartedData = (err, result) => {
          if (err) return reject(err);

          const formated = (keys && keys.length)
            ? keys.map((key, idx) => ({ key, value: result[idx] })) : [];

          return resolve(formated);
        };

        return this.getClient().mget(keys, formartedData);
      } catch (error) {
        return reject(new Error(error));
      }
    });
  }

  /**
   * SET key-value in cache
   * @param {Redis.KeyType} key - Key set in cache
   * @param {Redis.ValueType} value - Value to key
   * @param {object} settings - Options set
      * @property {EX|PX}  settings.type - Type time TTL
      * @property {number} settings.TTL - TTL
   * @return {boolean} It's true SET save success
  */
  set(key, value, settings = { TTL: this.settings.TTL, type: 'EX' }) {
    return new Promise((resolve, reject) => {
      this.getClient().set(
        key,
        value,
        settings.type,
        settings.TTL,
        (err, replay) => ((err) ? reject(err) : resolve((!!replay))),
      );
    });
  }

  /**
   * SETNX key-value in cache
   * @param {Redis.KeyType} key - Key set in cache
   * @param {Redis.ValueType} value - Value to key
   * @return {number} It's 1 SET save success
  */
  setnx(key, value) {
    return new Promise((resolve, reject) => {
      this.getClient().setnx(
        key,
        value,
        (err, replay) => ((err) ? reject(err) : resolve(replay)),
      );
    });
  }

  /**
   * @private
   * ScanStream read data
   * @param {Stream.Readable} scan - scanner
   * @return {Array} The values found in cache.
 */
  scanStream(scan) {
    return new Promise((resolve, reject) => {
      const keysFound = [];

      scan.on('data', elem => elem.map(item => keysFound.push(item)));

      scan.on('end', err => ((err) ? reject(err) : resolve(keysFound)));
    });
  }

  /**
   * @private
   * ScanCluster read data in cluster
   * @param {Redis.ScanStreamOption} options - Options scan
   * @return {Array} The values found in cache.
 */
  scanCluster(options) {
    return new Promise(async (resolve) => {
      const nodes = this.getClient().nodes('master');

      const keysFound = await Promise.all(
        nodes.map(
          node => this.scanStream(node.scanStream(options)),
        ),
      );

      return resolve([].concat(...keysFound));
    });
  }

  /**
   * Scan read data
   * WARNING: In cluster mode "scan" search in ALL nodes
   * @param {Redis.ScanStreamOption} options - Options scan
   * @return {Array} The values found in cache.
 */
  async scan(options) {
    const instance = this.getClient();

    if (instance.isCluster) return this.scanCluster(options);

    const keys = await this.scanStream(instance.scanStream(options));

    return keys;
  }

  /**
   * Delete Key
   * @param {Redis.KeyType} key - Key
   * @return {number}
 */
  delete(key) {
    return new Promise((resolve, reject) => {
      this.getClient().del(
        key,
        (err, replay) => ((err) ? reject(err) : resolve(replay)),
      );
    });
  }

  /**
   * Flushdb
   * WARNING: In cluster mode "flusbdb" is disable
   * @return {string} - "OK"
 */
  flusbdb() {
    return new Promise((resolve, reject) => {
      if (this.settings.cluster) reject(new Error('Flush disable in cluster mode'));

      this.getClient().flushdb(
        (err, replay) => ((err) ? reject(err) : resolve(replay)),
      );
    });
  }

  /**
   * DeleteKeyLike
   * @param {string} keyMatch - keyMatch match pattern
   * @return {number} - Numbers key of delete
 */
  async deleteKeyLike(keyMatch) {
    if (keyMatch) {
      const keys = await this.scan({ count: 1000, match: keyMatch });

      if (keys && keys.length) {
        for (let index = 0; index < keys.length; index += 1) {
          // eslint-disable-next-line no-await-in-loop
          await this.delete(keys[index]);
        }

        return keys.length;
      }

      return 0;
    }

    return 0;
  }

  /**
   * Subscribe to event
   * @param {string} key event to subscribe to
   * @param {function} handler function to handle the received event
   */
  subscribe(key, handler) {
    const client = this.getClient();

    client.subscribe(key, () => client.on('message', handler));
  }
}

module.exports = DatabaseCache;

这是我的 index.spec.js

/* eslint-disable max-lines */
const Redis = require('ioredis');
const DatabaseCache = require('../../../../src/helpers/cache');

jest.setTimeout(30000);

describe('Helpers > Cache interface ', () => {
  describe('> Single Mode', () => {
    let cache = new DatabaseCache({});

    beforeAll(() => {
      cache = new DatabaseCache({
        db: 2,
        host: 'localhost',
        port: 6379,
      });
    });

    it('getClient return instance Redis when cluster mode disable', (done) => {
      expect(cache.getClient()).toBeInstanceOf(Redis);
      done();
    });

    it('select Database', async (done) => {
      await cache.select('7');

      done();
    });

    it('select Database error', (done) => {
      expect(cache.select())
        .rejects
        .toThrow();

      done();
    });

    it('set Key', async (done) => {
      const key = 's1-teste';
      const value = JSON.stringify({ key });

      const result = await cache.set(key, value, { TTL: 3000, type: 'EX' });

      expect(result).toEqual(true);

      done();
    });

    it('set Key and test expire Key', async (done) => {
      const key = 's2-teste';
      const value = JSON.stringify({ key });

      const result = await cache.set(key, value, { TTL: 300, type: 'PX' });

      setTimeout(async () => {
        const keyFound = await cache.get(key);

        expect(keyFound).toEqual(null);
        expect(result).toEqual(true);

        done();
      }, (400));
    });

    it('get Key', async (done) => {
      const key = 't1';
      const value = JSON.stringify({ key });

      await cache.set(key, value, { TTL: 3000, type: 'EX' });

      const keyValue = await cache.get(key);

      expect(keyValue).toEqual(value);

      done();
    });

    it('get Key error', (done) => {
      const cacheError = new DatabaseCache({
        db: 2,
        host: 'localhost121',
        port: 6379,
      });

      expect(cacheError.get(undefined))
        .rejects
        .toThrow();

      done();
    });

    it('getMany Keys', async (done) => {
      const key1 = 't1-998899';
      const value1 = JSON.stringify({ key1 });

      const key2 = 't1-998800';
      const value2 = JSON.stringify({ key2 });

      await cache.set(key1, value1, { TTL: 3000, type: 'EX' });
      await cache.set(key2, value2, { TTL: 3000, type: 'EX' });

      const keysFound = await cache.getMany([key1, key2]);

      expect(keysFound).toEqual([{ key: key1, value: value1 }, { key: key2, value: value2 }]);

      done();
    });

    it('getMany Keys error', (done) => {
      const cacheError = new DatabaseCache({
        db: 2,
        host: 'localhost121',
        port: 6379,
      });

      expect(cacheError.getMany(undefined))
        .rejects
        .toThrow();

      done();
    });

    it('setnx Key', async (done) => {
      const key = 'snx1-teste';
      const value = JSON.stringify({ key });

      const result = await cache.setnx(key, value);
      const result2 = await cache.setnx(key, value);

      cache.delete(key);

      expect(result).toEqual(1); // insert is success
      expect(result2).toEqual(0); // not insert

      done();
    });

    it('scan keys by match', async (done) => {
      const cacheScan = new DatabaseCache({
        db: 3,
        host: 'localhost',
        port: 6379,
      });

      const prefix = 't1-';

      const keys = [
        { key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
        { key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
        { key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
        { key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
        { key: 't2-998833', value: JSON.stringify({ key: 't2-998833' }) },
      ];

      for (let index = 0; index < keys.length; index += 1) {
        const element = keys[index];

        // eslint-disable-next-line no-await-in-loop
        await cacheScan.set(element.key, element.value, { TTL: 3000, type: 'EX' });
      }

      const results = await cacheScan.scan({
        match: `${prefix}*`,
      });

      expect(results.length).toBe((keys.length - 1));
      expect(results).toContain(keys[0].key);
      expect(results).toContain(keys[1].key);
      expect(results).toContain(keys[2].key);
      expect(results).toContain(keys[3].key);

      done();
    });

    it('delete Key', async (done) => {
      const key = 'del1-teste';
      const value = JSON.stringify({ key });

      await cache.setnx(key, value);
      const delResult = await cache.delete(key);
      const getResult = await cache.get(key);

      expect(delResult).toEqual(1); // delete is success
      expect(getResult).toEqual(null);

      done();
    });

    it('deleteKeyLike by match', async (done) => {
      const cacheDeleteLike = new DatabaseCache({
        db: 1,
        host: 'localhost',
        port: 6379,
      });

      const prefix = 'dellike1-';

      const keys = [
        { key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
        { key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
        { key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
        { key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
      ];

      for (let index = 0; index < keys.length; index += 1) {
        const element = keys[index];

        // eslint-disable-next-line no-await-in-loop
        await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
      }

      const results = await cacheDeleteLike.deleteKeyLike(`${prefix}*`);

      expect(results).toBe((keys.length));

      done();
    });

    it('deleteKeyLike by match (undefined)', async (done) => {
      const cacheDeleteLike = new DatabaseCache({
        db: 1,
        host: 'localhost',
        port: 6379,
      });

      const prefix = 'dellike2-';

      const keys = [
        { key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
        { key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
        { key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
        { key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
      ];

      for (let index = 0; index < keys.length; index += 1) {
        const element = keys[index];

        // eslint-disable-next-line no-await-in-loop
        await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
      }

      const results = await cacheDeleteLike.deleteKeyLike(undefined);

      expect(results).toBe(0);

      done();
    });

    it('deleteKeyLike by match (keys notfound)', async (done) => {
      const cacheDeleteLike = new DatabaseCache({
        db: 1,
        host: 'localhost',
        port: 6379,
      });

      const prefix = 'dellike3-';

      const keys = [
        { key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
        { key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
        { key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
        { key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
      ];

      for (let index = 0; index < keys.length; index += 1) {
        const element = keys[index];

        // eslint-disable-next-line no-await-in-loop
        await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
      }

      const results = await cacheDeleteLike.deleteKeyLike('dellike4-*');

      expect(results).toBe(0);

      done();
    });

    it('flushdb', async (done) => {
      const cacheFlushdb = new DatabaseCache({
        db: 4,
        host: 'localhost',
        port: 6379,
      });

      const results = await cacheFlushdb.flusbdb();

      expect(results).toBe('OK');

      done();
    });

    it('subscribe', async (done) => {
      const settings = {
        db: 0,
        host: 'localhost',
        port: 6379,
      };

      const publisher = new DatabaseCache(settings);
      const subscriber = new DatabaseCache(settings);

      const myKey = 'somekey';

      publisher
        .getClient()
        .send_command('config', ['set', 'notify-keyspace-events', 'Ex'], () => {
          const key = `__keyevent@${settings.db}__:expired`;

          // eslint-disable-next-line max-nested-callbacks
          subscriber.subscribe(key, (_, expiredKey) => {
            expect(expiredKey).toEqual(myKey);
            done();
          });
        });

      await publisher.set(myKey, 'someValue', { TTL: 2, type: 'EX' });
    });
  });

  describe('> Cluster Mode', () => {
    let settings;
    let publisher;
    let subscriber;

    beforeAll(() => {
      settings = {
        cluster: true,
        db: 0,
        nodes: [
          { host: '127.0.0.1', port: 7000 },
          { host: '127.0.0.1', port: 7001 },
          { host: '127.0.0.1', port: 7002 },
          { host: '127.0.0.1', port: 7003 },
          { host: '127.0.0.1', port: 7004 },
          { host: '127.0.0.1', port: 7005 },
        ],
        options: {
          scaleReads: 'slave',
        },
      };

      publisher = new DatabaseCache(settings);
      subscriber = new DatabaseCache(settings);
    });

    it('getClient return instance Redis.Cluster when cluster mode enable', (done) => {
      expect(publisher.getClient()).toBeInstanceOf(Redis.Cluster);
      done();
    });

    it('scan in cliuster mode', async (done) => {
      const prefix = '{t1}'; // hashtag is keys

      const keys = [
        { key: `${prefix}-998800`, value: JSON.stringify({ key: `${prefix}-998800` }) },
        { key: `${prefix}-998811`, value: JSON.stringify({ key: `${prefix}-998811` }) },
        { key: `${prefix}-998822`, value: JSON.stringify({ key: `${prefix}-998822` }) },
        { key: `${prefix}-998833`, value: JSON.stringify({ key: `${prefix}-998833` }) },
        { key: '{t2}-998833', value: JSON.stringify({ key: 't2-998833' }) },
      ];

      for (let index = 0; index < keys.length; index += 1) {
        const element = keys[index];

        // eslint-disable-next-line no-await-in-loop
        await publisher.set(element.key, element.value, { TTL: 3000, type: 'EX' });
      }

      const results = await publisher.scan({
        match: `${prefix}*`,
      });

      expect(results.length).toBe((keys.length - 1));
      expect(results).toContain(keys[0].key);
      expect(results).toContain(keys[1].key);
      expect(results).toContain(keys[2].key);
      expect(results).toContain(keys[3].key);

      done();
    });

    it('flushdb in cluster mode', (done) => {
      expect(publisher.flusbdb())
        .rejects
        .toThrow();

      done();
    });

    it('subscribe in cluster mode', async (done) => {
      const myKey = 'somekey';
      const key = `__keyevent@${settings.db}__:expired`;

      await publisher.set(myKey, 'someValue', { TTL: 2, type: 'Ex' });

      await subscriber.subscribe(key, (_msg, expiredKey) => {
        expect(expiredKey).toEqual(myKey);
        done();
      });
    });
  });
});

除了最后一个是这样的之外,所有测试都在工作:

    it('subscribe in cluster mode', async (done) => {
      const myKey = 'somekey';
      const key = `__keyevent@${settings.db}__:expired`;

      await publisher.set(myKey, 'someValue', { TTL: 2, type: 'Ex' });

      await subscriber.subscribe(key, (_msg, expiredKey) => {
        expect(expiredKey).toEqual(myKey);
        done();
      });
    });

这是一个屏幕截图: 唯一不成功的测试

这是我的集群运行: 在 docker 中运行的集群

标签: testingredisjestjspublish-subscribeioredis

解决方案


推荐阅读