首页 > 解决方案 > 防止在 RxJS 中过早完成异步管道操作符

问题描述

我正在使用 RxJS 6 创建可管道操作符complete(),并且不清楚当操作是异步时如何观察观察者。

对于同步操作,逻辑很简单。在下面的示例中,来自源的所有值Observable都将传递给observer.next(),然后observer.complete()被调用。

const syncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => observer.next(x),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(syncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

但是,对于异步操作,我有点不知所措。在下面的示例中,异步操作由对 的调用表示setTimeout()。显然,observer.complete()任何值传递给observer.next().

const asyncOp = () => (source) =>
  new rxjs.Observable(observer => {
    return source.subscribe({
      next: (x) => setTimeout(() => observer.next(x), 100),
      error: (e) => observer.error(err),
      complete: () => observer.complete()
    })
  });
  
rxjs.from([1, 2, 3]).pipe(asyncOp()).subscribe(x => console.log(x));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.2.2/rxjs.umd.min.js">
</script>

所以问题是:什么是惯用的 RxJS 方法,以便observer.complete()仅在所有值都异步传递给之后才进行调用observer.next()?我应该手动跟踪待处理的呼叫还是有更“被动”的解决方案?

(请注意,上面的示例是对我的实际代码的简化,并且调用setTimeout()旨在表示“任何异步操作”。我正在寻找一种通用方法来处理可管道运算符中的异步操作,而不是关于如何处理的建议处理 RxJS 中的延迟或超时。)

标签: javascriptasynchronousrxjsrxjs-pipeable-operators

解决方案


一种思路可能是重组您asyncOp以使用其他运算符,例如mergeMap.

这是使用这种方法重现您的示例的代码

const asyncOp = () => source => source.pipe(mergeMap(x => of(x).pipe(delay(100))));
from([1, 2, 3]).pipe(asyncOp1()).subscribe(x => console.log(x));

这是否值得考虑取决于您的asyncOp工作。如果它是异步的,因为它依赖于一些回调,例如在 https 调用或从文件系统读取的情况下,那么我认为这种方法可以工作,因为您可以将基于回调的函数转换为 Observable。


推荐阅读