首页 > 解决方案 > 使用 RxJS 延迟一批 observables

问题描述

我对我的数据库执行 http 请求,并注意到如果我一次发送所有请求,其中一些请求会出现超时错误。我想在调用之间添加一个延迟,这样服务器就不会超载。我正在尝试找到解决此问题的 RxJS 解决方案,并且不想添加setTimeout.

这是我目前所做的:

let observables = [];
for(let int = 0; int < 10000; int++){
   observables.push(new Observable((observer) => {
      db.add(doc[int], (err, result)=>{
         observer.next();
         observer.complete();
      })
   }))
}

forkJoin(observables).subscribe(
   data => {
   },
   error => {
      console.log(error);
   },
   () => {
      db.close();
   }
);

标签: rxjsdelay

解决方案


你确实可以用 Rxjs 很好地实现这一点。您将需要更高阶的可观察对象,这意味着您会将可观察对象发射到可观察对象中,而高阶可观察对象将为您解决这个问题。

这种方法的好处是您可以轻松地在 // 中运行 X 请求,而无需自己管理请求池。

这是工作代码:

import { Observable, Subject } from "rxjs";
import { mergeAll, take, tap } from "rxjs/operators";

// this is just a mock to demonstrate how it'd behave if the API was
// taking 2s to reply for a call
const mockDbAddHtppCall = (id, cb) =>
  setTimeout(() => {
    cb(null, `some result for call "${id}"`);
  }, 2000);

// I have no idea what your response type looks like so I'm assigning
// any but of course you should have your own type instead of this
type YourRequestType = any;

const NUMBER_OF_ITEMS_TO_FETCH = 10;

const calls$$ = new Subject<Observable<YourRequestType>>();

calls$$
  .pipe(
    mergeAll(3),
    take(NUMBER_OF_ITEMS_TO_FETCH),
    tap({ complete: () => console.log(`All calls are done`) })
  )
  .subscribe(console.log);

for (let id = 0; id < NUMBER_OF_ITEMS_TO_FETCH; id++) {
  calls$$.next(
    new Observable(observer => {
      console.log(`Starting a request for ID "${id}""`);
      mockDbAddHtppCall(id, (err, result) => {
        if (err) {
          observer.error(err);
        } else {
          observer.next(result);
          observer.complete();
        }
      });
    })
  );
}

以及 Stackblitz 上的现场演示:https ://stackblitz.com/edit/rxjs-z1x5m9

请打开浏览器的控制台,注意其中 3 个呼叫触发时显示的控制台日志会立即启动,然后等待 1 个完成后再接听另一个。


推荐阅读