rxjs - 去抖和缓冲一个 rxjs 订阅
问题描述
我有一个消息队列处理器,可以将消息提供给服务......
q.on("message", (m) => {
service.create(m)
.then(() => m.ack())
.catch(() => n.nack())
})
该服务使用 RxJS Observable 并订阅debounceTime()
这些请求。
class Service {
constructor() {
this.subject = new Subject()
this.subject.debounceTime(1000)
.subscribe(({ req, resolve, reject }) =>
someOtherService.doWork(req)
.then(() => resolve())
.catch(() => reject())
)
}
create(req) {
return new Promise((resolve, reject) =>
this.subject.next({
req,
resolve,
reject
})
)
}
}
问题是只有去抖动的请求才会被确认/取消。如何确保订阅也解决/拒绝其他请求? bufferTime()
让我在那里的一部分,但它不会重置每次调用的超时持续时间next()
。
解决方案
对于那些正在寻找 RXJS 6 解决方案的人,我创建了一个自定义运算符,其行为类似于上一个答案中的debounce()
+ buffer()
。
我调用了它bufferDebounce
,Typescript 中带有类型推断的片段在这里:
import { Observable, OperatorFunction } from 'rxjs'
import { buffer, debounceTime } from 'rxjs/operators'
type BufferDebounce = <T>(debounce: number) => OperatorFunction<T, T[]>;
const bufferDebounce: BufferDebounce = debounce => source =>
new Observable(observer =>
source.pipe(buffer(source.pipe(debounceTime(debounce)))).subscribe({
next(x) {
observer.next(x);
},
error(err) {
observer.error(err);
},
complete() {
observer.complete();
},
}),
);
您可以在此示例中测试其行为以检查这是否适合您https://stackblitz.com/edit/rxjs6-buffer-debounce
推荐阅读
- java - 数组中有字符串和整数的二维冒泡排序java程序
- android-studio - 运行“应用程序”时出错:未找到默认活动(Android Studio)
- reactjs - 如何获取 React Native 镜像失败错误码
- r - 计算字符串序列的平均值,删除任何 1SD 或更大的值,然后用平均值替换删除的值
- php - 带有 PHP 错误和存储过程的 sqlsrv_execute 错误
- excel - MergeWorkbooks - 在第 21 个单元格中找到的值
- regex - sed/grep - 显示第 1 列和第 5 列,删除第 5 列中的逗号并交换 Last-First,并在 /etc/passwd 中按姓氏字母顺序排序
- javascript - 如何替换一个json对象的所有属性名
- reactjs - 使用 next.js 和样式化组件重新加载时的 Flash Of Unstyled Text (FOUT)
- pandas - 获取与不同列中的日期和时间相对应的行号