javascript - 防止在 RxJS 中过早完成异步管道操作符
问题描述
我正在使用 RxJS 6 创建可管道操作符complete()
,并且不清楚当操作是异步时如何观察观察者。
对于同步操作,逻辑很简单。在下面的示例中,来自源的所有值Observable
都将传递给observer.next()
,然后observer.complete()
被调用。
const syncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => observer.next(x),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 的调用表示setTimeout()
。显然,observer.complete()
将在任何值传递给observer.next()
.
const asyncOp = () => (source) =>
new rxjs.Observable(observer => {
return source.subscribe({
next: (x) => setTimeout(() => observer.next(x), 100),
error: (e) => observer.error(err),
complete: () => observer.complete()
})
});
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>
所以问题是:什么是惯用的 RxJS 方法,以便observer.complete()
仅在所有值都异步传递给之后才进行调用observer.next()
?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?
(请注意,上面的示例是对我的实际代码的简化,并且调用setTimeout()
旨在表示“任何异步操作”。我正在寻找一种通用方法来处理可管道运算符中的异步操作,而不是关于如何处理的建议处理 RxJS 中的延迟或超时。)
解决方案
一种思路可能是重组您asyncOp
以使用其他运算符,例如mergeMap
.
这是使用这种方法重现您的示例的代码
const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));
这是否值得考虑取决于您的asyncOp
工作。如果它是异步的,因为它依赖于一些回调,例如在 https 调用或从文件系统读取的情况下,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。
推荐阅读
- ubuntu - Google 计算引擎 - 如何自定义用户主目录?
- php - 无法将表单发布(CURL)到 EPayment 网关无法发布(CURL)表单到 EPayment 网关
- 2sxc - 添加下拉菜单 Bootstrap Instant 4
- apache-spark - 为什么每一行都必须是唯一的?
- css - CSS旋转不能正确过渡超过225度?
- rest - 如何使用 spring、apache cxf、maven 实现 POC 并与 servicemix 7.0.1 集成
- java - slf4j.Marker 不兼容类型
- r - 在随后的更多列上使用应用功能
- c# - 修改将使用 msi 安装程序安装的 .XML。C#
- php - 如何获取多个日期的最新日期?