首页 > 解决方案 > 如何在 RxJs 订阅中添加缓冲和去抖动限制

问题描述

我想使用 RxJs 实现以下目标:

  1. 与上一条消息相距约 200 毫秒的群组消息
  2. 250ms内没有收到新消息时发送消息组
  3. 当组达到 10 项时发出组消息。

感谢关于 SO 的其他几个问题,例如this one,使用 and 的组合很容易实现 1 和 2 bufferdebounceTime如下所示:

const subject$ = new Subject<number>();

// Create the debounce
const notifier$ = subject$.pipe(
  debounceTime(250)
);

// Subscribe to the subject using buffer and debounce
subject$
  .pipe(
    buffer(notifier$)
  )
  .subscribe(value => console.log(value));

// Add a number to the subject every 200ms untill it reaches 10
interval(200)
  .pipe(
    takeWhile(value => value <= 10),
  )
  .subscribe(value => subject$.next(value));

只要在最后一条消息的 200 毫秒内发出消息,这里就会缓冲消息。如果花费超过 200 毫秒,则启动一个新缓冲区。但是,如果消息持续在 200 毫秒以下,则消息可能会被永久缓冲。这就是我想对缓冲区大小添加硬限制的原因。

我在StackBlitz创建了一个示例来演示缓冲区去抖动。但我不知道如何限制缓冲区,以便它在达到 10 个项目时也发出。

标签: angularrxjs

解决方案


我们可以创建另一个通知器来限制项目的数量(例如 with elementAt),使用首先发出的通知器(with race)并递归地应用它(with expand):

const notifierDebouncing$ = subject$.pipe(
  debounceTime(PERIOD),
  take(1)
);

const notifierLimiting$ = subject$.pipe(
  elementAt(AMOUNT - 1)
);

const notifier$ = interval(0).pipe(
  take(1),
  expand(_ => race(notifierLimiting$, notifierDebouncing$))
);

subject$
  .pipe(buffer(notifier$))
  .subscribe(value => console.log(value));

你怎么看?

这是一个基于您的演示应用程序的示例:https : //stackblitz.com/edit/rxjs-buffer-debounce-cf4qjy(打开控制台,然后将光标移动 2000 毫秒并停止 500 毫秒)


推荐阅读