angular - 如何在 RxJs 订阅中添加缓冲和去抖动限制
问题描述
我想使用 RxJs 实现以下目标:
- 与上一条消息相距约 200 毫秒的群组消息
- 250ms内没有收到新消息时发送消息组
- 当组达到 10 项时发出组消息。
感谢关于 SO 的其他几个问题,例如this one,使用 and 的组合很容易实现 1 和 2 buffer
,debounceTime
如下所示:
const subject$ = new Subject<number>();
// Create the debounce
const notifier$ = subject$.pipe(
debounceTime(250)
);
// Subscribe to the subject using buffer and debounce
subject$
.pipe(
buffer(notifier$)
)
.subscribe(value => console.log(value));
// Add a number to the subject every 200ms untill it reaches 10
interval(200)
.pipe(
takeWhile(value => value <= 10),
)
.subscribe(value => subject$.next(value));
只要在最后一条消息的 200 毫秒内发出消息,这里就会缓冲消息。如果花费超过 200 毫秒,则启动一个新缓冲区。但是,如果消息持续在 200 毫秒以下,则消息可能会被永久缓冲。这就是我想对缓冲区大小添加硬限制的原因。
我在StackBlitz创建了一个示例来演示缓冲区去抖动。但我不知道如何限制缓冲区,以便它在达到 10 个项目时也发出。
解决方案
我们可以创建另一个通知器来限制项目的数量(例如 with elementAt
),使用首先发出的通知器(with race
)并递归地应用它(with expand
):
const notifierDebouncing$ = subject$.pipe(
debounceTime(PERIOD),
take(1)
);
const notifierLimiting$ = subject$.pipe(
elementAt(AMOUNT - 1)
);
const notifier$ = interval(0).pipe(
take(1),
expand(_ => race(notifierLimiting$, notifierDebouncing$))
);
subject$
.pipe(buffer(notifier$))
.subscribe(value => console.log(value));
你怎么看?
这是一个基于您的演示应用程序的示例:https : //stackblitz.com/edit/rxjs-buffer-debounce-cf4qjy(打开控制台,然后将光标移动 2000 毫秒并停止 500 毫秒)
推荐阅读
- java - 如何在 Eclipse 微配置文件中将 JSON 值转换为 JsonWebToken
- node.js - 对象中的 ejs 数组
- selenium-webdriver - 无法从 Selenium 启动 IE - 保护模式设置对于所有区域都不相同
- excel - 打开一个工作簿,然后复制其名称可以不同的第三个选项卡?
- python-3.x - 是否可以在 Python3 多重继承中将 super() 与所有父级一起使用?
- javascript - jQuery validate,如何为动态生成的字段制定验证规则?
- cmake - Ninja 无法识别 CMake 生成器表达式
- reactjs - 将expandIcon图标向右移动并从扩展表中删除空白区域Ant Design reatc js
- python - 具有曼哈顿距离的 Siamese BiLSTM 神经网络每次对相同的测试数据给出非常不同的相似度分数
- react-native - React Native Flex 两行有不寻常的空间