首页 > 解决方案 > 如何使用异步函数序列化 onNext 调用的 RxJS 订阅?

问题描述

这是这个问题的后续。

因此,当我在异步方法中等待时,RxJS 正在使用下一个事件调用我的异步函数,因为前一个事件已经完成。

我需要序列化对此异步函数的调用。

我从这个答案中了解到一个类似的问题,我需要将我的异步函数从订阅中移开并使用 concatMap。

现在我的代码没有编译并出现以下错误:

错误 TS2339:“可观察”类型上不存在属性“concatMap”。

我的代码(试图调整):

1/ 新的订阅代码(不会编译):

this.emitter = fromEventPattern(this.addHandler, this.removeHandler, (err, char) => [err, char]); <= unchanged
this.rxSubscription = this.rxSubscription = this.emitter.concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value)).subscribe(); <= concatMap does not exist on type Observable<any>

2/ 供您参考的异步功能:

       handleUpdatedValuesComingFromSensor = async (arr: any[]): Promise<void> => {
   ...
   await someMethodAsync();
   ...
}

concatMap 应该用于另一种类型的源,但我无法弄清楚。

提前致谢。

标签: typescriptrxjs

解决方案


As Kos stated, in Rxjs v6, pipeable operators became the norm, moving away from chaining everything together with .. I assume since you're using fromEventPattern, instead of Observable.fromEventPattern that you are using rxjs v6+, in which case, you need to wrap concatMap() inside a pipe().

this.rxSubscription = this.emitter.pipe(concatMap(value:any => this.handleUpdatedValuesComingFromSensor(value))).subscribe()

推荐阅读