testing - ioRedis.Cluster 的玩笑有间歇性结果 - 未调用异步回调
问题描述
我正在尝试使用 docker 在集群模式下测试 ioredis 中的订阅。我在邮递员的实际测试中确实取得了成功,但是当我用 Jest 进行测试时,它变得断断续续,我进入Timeout - Async callback was not invoked within the 30000 ms timeout specified by jest.setTimeout.Timeout
了控制台。
我的集群运行良好,这是我的 index.js
/* eslint-disable max-lines, no-unused-vars */
const { App } = require('@MYCOMPANY/node-base');
const Redis = require('ioredis');
const Stream = require('stream');
const DEFAULT_SETTINGS = {
cluster: false,
};
let instanceCache;
/**
* @class
*/
class DatabaseCache {
/**
* @constructor
* @param {{
* host: String,
* port: Number,
* db: Number|String,
* TTL: Number,
* cluster: Boolean,
* nodes: Redis.ClusterNode[],
* options: Redis.ClusterOptions
* }} settings - Config cache
*/
constructor(settings) {
this.settings = {
...DEFAULT_SETTINGS,
...settings,
};
}
/**
* GetClient returns instance client
* @return {Redis.Cluster|Redis.Redis} A Cache object.
*/
getClient() {
try {
if (!this.instanceCache) {
this.instanceCache = (this.settings.cluster)
? new Redis.Cluster(this.settings.nodes, this.settings.options)
: new Redis(this.settings);
this.on();
return this.instanceCache;
}
return this.instanceCache;
} catch (error) {
throw new Error(error);
}
}
/**
* @private
* On client events
*/
on() {
this.instanceCache.on('message', () => {
App.Logger.current().debug({ msg: `redis.token.connect.success ${process.env.REDIS_PRODUCT_HOST}` });
});
this.instanceCache.on('error', (error) => {
App.Logger.current().error({ error, msg: 'redis.token.connect.error' });
});
}
/**
* Select database
* WARNING: This method is disable in cluster mode
* @param {string|number} db - Database name or number
* @return {void}
*/
select(db) {
return new Promise((resolve, reject) => {
this.getClient().select(
db,
err => ((err) ? reject(err) : resolve()),
);
});
}
/**
* Get by key
* @param {Redis.KeyType} key - Key
* @return {string} - Key value
*/
get(key) {
return new Promise((resolve, reject) => {
this.getClient().get(
key,
(err, replay) => ((err) ? reject(err) : resolve(replay)),
);
});
}
/**
* GetMany keys
* WARNING: In cluster mode "mget" works by searching with keys inside a single slot
* @param {Array.<Redis.KeyType>} keys - Key
* @return {Array.<{key: Redis.KeyType, value: string}> - Key value
*/
getMany(keys) {
return new Promise((resolve, reject) => {
try {
const formartedData = (err, result) => {
if (err) return reject(err);
const formated = (keys && keys.length)
? keys.map((key, idx) => ({ key, value: result[idx] })) : [];
return resolve(formated);
};
return this.getClient().mget(keys, formartedData);
} catch (error) {
return reject(new Error(error));
}
});
}
/**
* SET key-value in cache
* @param {Redis.KeyType} key - Key set in cache
* @param {Redis.ValueType} value - Value to key
* @param {object} settings - Options set
* @property {EX|PX} settings.type - Type time TTL
* @property {number} settings.TTL - TTL
* @return {boolean} It's true SET save success
*/
set(key, value, settings = { TTL: this.settings.TTL, type: 'EX' }) {
return new Promise((resolve, reject) => {
this.getClient().set(
key,
value,
settings.type,
settings.TTL,
(err, replay) => ((err) ? reject(err) : resolve((!!replay))),
);
});
}
/**
* SETNX key-value in cache
* @param {Redis.KeyType} key - Key set in cache
* @param {Redis.ValueType} value - Value to key
* @return {number} It's 1 SET save success
*/
setnx(key, value) {
return new Promise((resolve, reject) => {
this.getClient().setnx(
key,
value,
(err, replay) => ((err) ? reject(err) : resolve(replay)),
);
});
}
/**
* @private
* ScanStream read data
* @param {Stream.Readable} scan - scanner
* @return {Array} The values found in cache.
*/
scanStream(scan) {
return new Promise((resolve, reject) => {
const keysFound = [];
scan.on('data', elem => elem.map(item => keysFound.push(item)));
scan.on('end', err => ((err) ? reject(err) : resolve(keysFound)));
});
}
/**
* @private
* ScanCluster read data in cluster
* @param {Redis.ScanStreamOption} options - Options scan
* @return {Array} The values found in cache.
*/
scanCluster(options) {
return new Promise(async (resolve) => {
const nodes = this.getClient().nodes('master');
const keysFound = await Promise.all(
nodes.map(
node => this.scanStream(node.scanStream(options)),
),
);
return resolve([].concat(...keysFound));
});
}
/**
* Scan read data
* WARNING: In cluster mode "scan" search in ALL nodes
* @param {Redis.ScanStreamOption} options - Options scan
* @return {Array} The values found in cache.
*/
async scan(options) {
const instance = this.getClient();
if (instance.isCluster) return this.scanCluster(options);
const keys = await this.scanStream(instance.scanStream(options));
return keys;
}
/**
* Delete Key
* @param {Redis.KeyType} key - Key
* @return {number}
*/
delete(key) {
return new Promise((resolve, reject) => {
this.getClient().del(
key,
(err, replay) => ((err) ? reject(err) : resolve(replay)),
);
});
}
/**
* Flushdb
* WARNING: In cluster mode "flusbdb" is disable
* @return {string} - "OK"
*/
flusbdb() {
return new Promise((resolve, reject) => {
if (this.settings.cluster) reject(new Error('Flush disable in cluster mode'));
this.getClient().flushdb(
(err, replay) => ((err) ? reject(err) : resolve(replay)),
);
});
}
/**
* DeleteKeyLike
* @param {string} keyMatch - keyMatch match pattern
* @return {number} - Numbers key of delete
*/
async deleteKeyLike(keyMatch) {
if (keyMatch) {
const keys = await this.scan({ count: 1000, match: keyMatch });
if (keys && keys.length) {
for (let index = 0; index < keys.length; index += 1) {
// eslint-disable-next-line no-await-in-loop
await this.delete(keys[index]);
}
return keys.length;
}
return 0;
}
return 0;
}
/**
* Subscribe to event
* @param {string} key event to subscribe to
* @param {function} handler function to handle the received event
*/
subscribe(key, handler) {
const client = this.getClient();
client.subscribe(key, () => client.on('message', handler));
}
}
module.exports = DatabaseCache;
这是我的 index.spec.js
/* eslint-disable max-lines */
const Redis = require('ioredis');
const DatabaseCache = require('../../../../src/helpers/cache');
jest.setTimeout(30000);
describe('Helpers > Cache interface ', () => {
describe('> Single Mode', () => {
let cache = new DatabaseCache({});
beforeAll(() => {
cache = new DatabaseCache({
db: 2,
host: 'localhost',
port: 6379,
});
});
it('getClient return instance Redis when cluster mode disable', (done) => {
expect(cache.getClient()).toBeInstanceOf(Redis);
done();
});
it('select Database', async (done) => {
await cache.select('7');
done();
});
it('select Database error', (done) => {
expect(cache.select())
.rejects
.toThrow();
done();
});
it('set Key', async (done) => {
const key = 's1-teste';
const value = JSON.stringify({ key });
const result = await cache.set(key, value, { TTL: 3000, type: 'EX' });
expect(result).toEqual(true);
done();
});
it('set Key and test expire Key', async (done) => {
const key = 's2-teste';
const value = JSON.stringify({ key });
const result = await cache.set(key, value, { TTL: 300, type: 'PX' });
setTimeout(async () => {
const keyFound = await cache.get(key);
expect(keyFound).toEqual(null);
expect(result).toEqual(true);
done();
}, (400));
});
it('get Key', async (done) => {
const key = 't1';
const value = JSON.stringify({ key });
await cache.set(key, value, { TTL: 3000, type: 'EX' });
const keyValue = await cache.get(key);
expect(keyValue).toEqual(value);
done();
});
it('get Key error', (done) => {
const cacheError = new DatabaseCache({
db: 2,
host: 'localhost121',
port: 6379,
});
expect(cacheError.get(undefined))
.rejects
.toThrow();
done();
});
it('getMany Keys', async (done) => {
const key1 = 't1-998899';
const value1 = JSON.stringify({ key1 });
const key2 = 't1-998800';
const value2 = JSON.stringify({ key2 });
await cache.set(key1, value1, { TTL: 3000, type: 'EX' });
await cache.set(key2, value2, { TTL: 3000, type: 'EX' });
const keysFound = await cache.getMany([key1, key2]);
expect(keysFound).toEqual([{ key: key1, value: value1 }, { key: key2, value: value2 }]);
done();
});
it('getMany Keys error', (done) => {
const cacheError = new DatabaseCache({
db: 2,
host: 'localhost121',
port: 6379,
});
expect(cacheError.getMany(undefined))
.rejects
.toThrow();
done();
});
it('setnx Key', async (done) => {
const key = 'snx1-teste';
const value = JSON.stringify({ key });
const result = await cache.setnx(key, value);
const result2 = await cache.setnx(key, value);
cache.delete(key);
expect(result).toEqual(1); // insert is success
expect(result2).toEqual(0); // not insert
done();
});
it('scan keys by match', async (done) => {
const cacheScan = new DatabaseCache({
db: 3,
host: 'localhost',
port: 6379,
});
const prefix = 't1-';
const keys = [
{ key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
{ key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
{ key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
{ key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
{ key: 't2-998833', value: JSON.stringify({ key: 't2-998833' }) },
];
for (let index = 0; index < keys.length; index += 1) {
const element = keys[index];
// eslint-disable-next-line no-await-in-loop
await cacheScan.set(element.key, element.value, { TTL: 3000, type: 'EX' });
}
const results = await cacheScan.scan({
match: `${prefix}*`,
});
expect(results.length).toBe((keys.length - 1));
expect(results).toContain(keys[0].key);
expect(results).toContain(keys[1].key);
expect(results).toContain(keys[2].key);
expect(results).toContain(keys[3].key);
done();
});
it('delete Key', async (done) => {
const key = 'del1-teste';
const value = JSON.stringify({ key });
await cache.setnx(key, value);
const delResult = await cache.delete(key);
const getResult = await cache.get(key);
expect(delResult).toEqual(1); // delete is success
expect(getResult).toEqual(null);
done();
});
it('deleteKeyLike by match', async (done) => {
const cacheDeleteLike = new DatabaseCache({
db: 1,
host: 'localhost',
port: 6379,
});
const prefix = 'dellike1-';
const keys = [
{ key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
{ key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
{ key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
{ key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
];
for (let index = 0; index < keys.length; index += 1) {
const element = keys[index];
// eslint-disable-next-line no-await-in-loop
await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
}
const results = await cacheDeleteLike.deleteKeyLike(`${prefix}*`);
expect(results).toBe((keys.length));
done();
});
it('deleteKeyLike by match (undefined)', async (done) => {
const cacheDeleteLike = new DatabaseCache({
db: 1,
host: 'localhost',
port: 6379,
});
const prefix = 'dellike2-';
const keys = [
{ key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
{ key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
{ key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
{ key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
];
for (let index = 0; index < keys.length; index += 1) {
const element = keys[index];
// eslint-disable-next-line no-await-in-loop
await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
}
const results = await cacheDeleteLike.deleteKeyLike(undefined);
expect(results).toBe(0);
done();
});
it('deleteKeyLike by match (keys notfound)', async (done) => {
const cacheDeleteLike = new DatabaseCache({
db: 1,
host: 'localhost',
port: 6379,
});
const prefix = 'dellike3-';
const keys = [
{ key: `${prefix}998800`, value: JSON.stringify({ key: `${prefix}998800` }) },
{ key: `${prefix}998811`, value: JSON.stringify({ key: `${prefix}998811` }) },
{ key: `${prefix}998822`, value: JSON.stringify({ key: `${prefix}998822` }) },
{ key: `${prefix}998833`, value: JSON.stringify({ key: `${prefix}998833` }) },
];
for (let index = 0; index < keys.length; index += 1) {
const element = keys[index];
// eslint-disable-next-line no-await-in-loop
await cacheDeleteLike.set(element.key, element.value, { TTL: 3000, type: 'EX' });
}
const results = await cacheDeleteLike.deleteKeyLike('dellike4-*');
expect(results).toBe(0);
done();
});
it('flushdb', async (done) => {
const cacheFlushdb = new DatabaseCache({
db: 4,
host: 'localhost',
port: 6379,
});
const results = await cacheFlushdb.flusbdb();
expect(results).toBe('OK');
done();
});
it('subscribe', async (done) => {
const settings = {
db: 0,
host: 'localhost',
port: 6379,
};
const publisher = new DatabaseCache(settings);
const subscriber = new DatabaseCache(settings);
const myKey = 'somekey';
publisher
.getClient()
.send_command('config', ['set', 'notify-keyspace-events', 'Ex'], () => {
const key = `__keyevent@${settings.db}__:expired`;
// eslint-disable-next-line max-nested-callbacks
subscriber.subscribe(key, (_, expiredKey) => {
expect(expiredKey).toEqual(myKey);
done();
});
});
await publisher.set(myKey, 'someValue', { TTL: 2, type: 'EX' });
});
});
describe('> Cluster Mode', () => {
let settings;
let publisher;
let subscriber;
beforeAll(() => {
settings = {
cluster: true,
db: 0,
nodes: [
{ host: '127.0.0.1', port: 7000 },
{ host: '127.0.0.1', port: 7001 },
{ host: '127.0.0.1', port: 7002 },
{ host: '127.0.0.1', port: 7003 },
{ host: '127.0.0.1', port: 7004 },
{ host: '127.0.0.1', port: 7005 },
],
options: {
scaleReads: 'slave',
},
};
publisher = new DatabaseCache(settings);
subscriber = new DatabaseCache(settings);
});
it('getClient return instance Redis.Cluster when cluster mode enable', (done) => {
expect(publisher.getClient()).toBeInstanceOf(Redis.Cluster);
done();
});
it('scan in cliuster mode', async (done) => {
const prefix = '{t1}'; // hashtag is keys
const keys = [
{ key: `${prefix}-998800`, value: JSON.stringify({ key: `${prefix}-998800` }) },
{ key: `${prefix}-998811`, value: JSON.stringify({ key: `${prefix}-998811` }) },
{ key: `${prefix}-998822`, value: JSON.stringify({ key: `${prefix}-998822` }) },
{ key: `${prefix}-998833`, value: JSON.stringify({ key: `${prefix}-998833` }) },
{ key: '{t2}-998833', value: JSON.stringify({ key: 't2-998833' }) },
];
for (let index = 0; index < keys.length; index += 1) {
const element = keys[index];
// eslint-disable-next-line no-await-in-loop
await publisher.set(element.key, element.value, { TTL: 3000, type: 'EX' });
}
const results = await publisher.scan({
match: `${prefix}*`,
});
expect(results.length).toBe((keys.length - 1));
expect(results).toContain(keys[0].key);
expect(results).toContain(keys[1].key);
expect(results).toContain(keys[2].key);
expect(results).toContain(keys[3].key);
done();
});
it('flushdb in cluster mode', (done) => {
expect(publisher.flusbdb())
.rejects
.toThrow();
done();
});
it('subscribe in cluster mode', async (done) => {
const myKey = 'somekey';
const key = `__keyevent@${settings.db}__:expired`;
await publisher.set(myKey, 'someValue', { TTL: 2, type: 'Ex' });
await subscriber.subscribe(key, (_msg, expiredKey) => {
expect(expiredKey).toEqual(myKey);
done();
});
});
});
});
除了最后一个是这样的之外,所有测试都在工作:
it('subscribe in cluster mode', async (done) => {
const myKey = 'somekey';
const key = `__keyevent@${settings.db}__:expired`;
await publisher.set(myKey, 'someValue', { TTL: 2, type: 'Ex' });
await subscriber.subscribe(key, (_msg, expiredKey) => {
expect(expiredKey).toEqual(myKey);
done();
});
});
解决方案
推荐阅读
- typescript - Typescript for newbies: can be typeA or typeB situation
- ios - 为什么我的 html 没有加载到我的 WKWebView
- c# - 分组类
- c++ - QT 5 - 无法连接以使用其他类中的插槽
- c# - 如何在 Selenium 上获取 SVG 的最后一个元素
- r - 如何将 sparlyr 连接到 spark 独立集群
- mysql - 公式 `mysql` 未安装
- php - 将嵌套数组中的“点表示法”键扩展到子数组
- mysql - AWS EB 操作错误“丢失与 MySQL 服务器的连接”
- java - java - 如何在java HashMap中根据产品类别获取产品详细信息?