javascript - 在nodejs中等待与rabbit的正确连接
问题描述
我尝试为 amqplib/callback_api 编写我的简单事件发射器包装器。当兔子不可用或断开连接时,我无法处理情况。我有getConnect
返回 Promise 的方法,该方法在建立连接时解决。但是如果连接被拒绝,Promise 显然会拒绝。如何在连接未建立时强制此方法重新连接
/**
* Async method getConnect for connection
* @returns {Promise<*>}
*/
getConnect = async () => {
return new Promise((resolve, reject) => {
amqp.connect(this.config.url, async function(err, conn) {
if (err) {
reject(err);
}
resolve(conn);
})
})
};
整个代码在这里https://github.com/kimonniez/rabbitEE
也许,我已经很困了,但我完全糊涂了:)提前谢谢!
解决方案
把你的Promise
里面包裹起来Observable
Promise
不是为处理“重试”逻辑而构建的。如果你想这样做,你应该使用library查看Observables。这将允许您在捕获错误时使用任意时间间隔重试。rxjs
const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;
const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;
// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
number < THRESHOLD
? Promise.reject(new Error("ERROR"))
: Promise.resolve("OK");
// Equivalent to `getConnect`
const getConnect = () =>
interval(RETRY_INTERVAL)
.pipe(
mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
skipWhile(x => {
const isError = x instanceof Error;
if (isError) console.log('Found error. Retrying...');
return isError;
}),
take(1)
).toPromise();
// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>
解释
interval
使用of创建一个源1000
。这意味着它将每秒重试一次- 打电话给你
amqp.connect
,这相当于functionThatThrows
我的例子 - 使用运算符捕获错误
catchError
并返回 - 在返回的对象是错误时跳过。仅当您已解决且未被拒绝时,这将允许您解决
Promise
- 使用第一个解决的结果
take(1)
- 使用实用函数将您的observable转换为 Promise
toPromise
then
调用您的函数并像使用标准一样附加Promise
推荐阅读
- flutter - How do I send an email through a Contact Form using Flutter Web?
- php - 警告:imap_open():无法打开流 {imap.gmail.com:993/imap/ssl
- assembly - Scrolling the terminal sideways (BIOS)
- html - 来自谷歌字体的字体系列流沙在 Mozilla Firefox 上被破坏
- apache-kafka - 由于没有身份验证问题,kafka 启动失败
- php - 从数组中选择的 Laravel Blade 多选选项
- javascript - 从另一个数组中过滤出一组项目
- python - 使用 bs4 python 抓取图像
- python - How to check the folder for file and then read the file in python
- azure - 在新数据流中将现有数据集与 Azure 数据工厂一起使用