首页 > 解决方案 > 在nodejs中等待与rabbit的正确连接

问题描述

我尝试为 amqplib/callback_api 编写我的简单事件发射器包装器。当兔子不可用或断开连接时,我无法处理情况。我有getConnect返回 Promise 的方法,该方法在建立连接时解决。但是如果连接被拒绝,Promise 显然会拒绝。如何在连接未建立时强制此方法重新连接

/**
     * Async method getConnect for connection
     * @returns {Promise<*>}
     */
    getConnect = async () => {
        return new Promise((resolve, reject) => {
            amqp.connect(this.config.url, async function(err, conn) {
                    if (err) {
                        reject(err);
                    }
                    resolve(conn);
            })
        })
    };

整个代码在这里https://github.com/kimonniez/rabbitEE

也许,我已经很困了,但我完全糊涂了:)提前谢谢!

标签: javascriptnode.jsrabbitmqnode-amqplib

解决方案


把你的Promise里面包裹起来Observable

Promise不是为处理“重试”逻辑而构建的。如果你想这样做,你应该使用library查看Observables。这将允许您在捕获错误时使用任意时间间隔重试。rxjs

const { from, interval, of } = rxjs;
const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;

const THRESHOLD = 3;
const RETRY_INTERVAL = 1000;

// Equivalent to 'amqp.connect'
const functionThatThrows = number =>
  number < THRESHOLD
    ? Promise.reject(new Error("ERROR"))
    : Promise.resolve("OK");

// Equivalent to `getConnect`
const getConnect = () =>
  interval(RETRY_INTERVAL)
    .pipe(
      mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
      skipWhile(x => {
        const isError = x instanceof Error;
        if (isError) console.log('Found error. Retrying...');
        return isError;
      }),
      take(1)
    ).toPromise();

// Resolve only if the inner Promise is resolved
getConnect().then(console.log);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

解释

  1. interval使用of创建一个源1000。这意味着它将每秒重试一次
  2. 打电话给你amqp.connect,这相当于functionThatThrows我的例子
  3. 使用运算符捕获错误catchError并返回
  4. 在返回的对象是错误时跳过。仅当您已解决且未被拒绝时,这将允许您解决Promise
  5. 使用第一个解决的结果take(1)
  6. 使用实用函数将您的observable转换为 PromisetoPromise
  7. then调用您的函数并像使用标准一样附加Promise

推荐阅读