首页 > 解决方案 > ForkJoin - ForLoop - 在每次订阅后处理一些逻辑,在所有可观察对象解决后处理一些逻辑

问题描述

我在 forloop 中有一个 http 调用。APICall 将返回我正在使用它呈现在页面上的 HTML 响应。

现在我还有一个要求,在所有 API 调用完成后,我必须执行一些逻辑来更新后端的一些数据。

我知道我们可以使用 forkjoin 运算符来捕获可观察的数组,然后更新 BE 数据。但我无法理解如何处理每个订阅必须完成的要求。

for(let item of Items){
    this.myService
            .getMyItemData(item.key)
            .pipe(
              takeUntil(this.destroyed),
              distinctUntilChanged(),
              catchError((e: Error) => {
                this.logger.logError('Error loading', e);
                return of('An Error Occurred');
              })
            ).subscribe((resp) => { 

//使用forkjoin时如何处理这个订阅?

             this.elementRef.nativeElement.html = resp;
    }) 
}
  

现在,在获得所有 itemData 后,我想为后端执行更新。为此,我正在考虑使用 forkJoin 来捕获所有可观察数据。但同时我想使用订阅代码来呈现 HTML。有人可以帮助我如何实现这一目标。

我的 forkJoin 代码参考*

let arrayOfObservables  = Items.map((item) => this.myService
                .getMyItemData(item.key))

let dataSource =  Rx.Observable.forkJoin(arrayOfObservables);

dataSource.subscribe((resp) => {
  // update my BE data
})

标签: angularrxjsobservable

解决方案


forkJoin只有在所有源 observable 完成时才会发出。鉴于distinctUntilChanged()运算符的使用,我假设每个可观察对象都是一个通知流。

  1. 在那种情况下combineLatest,比 更适合forkJoin。它会在任何源 observable 发出时发出。但请注意:每个 observable 应该至少发出一次才能触发订阅。如果您希望在某些可观察对象尚未发出之前触发订阅,您可以通过管道startWith(null)连接到每个可观察对象。还要查看 RxJSzip函数。是一个快速运行 b/n 不同的功能。

  2. 要在每次发射后执行某些操作,您可以使用tap运算符(如果执行副作用)或map运算符(转换数据)。

import { of, combineLatest, Subject } from 'rxjs';
import { tap, takeUntil, startWith, distinctUntilChanged, catchError } from 'rxjs/operators';

combineLatest(
  Items.map((item) => 
    this.myService.getMyItemData(item.key).pipe(
      startWith(null),           // <-- use conditionally (see above)
      distinctUntilChanged(),
      tap((resp) => {
        if (!!resp) {            // <-- avoid `null` from `startWith`
          this.elementRef.nativeElement.html = resp;
        }
      }),
      catchError((e: Error) => {
        this.logger.logError('Error loading', e);
        return of('An Error Occurred');
      })
    )
  )
).pipe(
  takeUntil(this.destroyed)
).subscribe((resp) => {
  // update my BE data
});

推荐阅读