首页 > 解决方案 > 通过多个请求捕获可观察对象的最后一个值

问题描述

我有一个服务层负责处理数据并将其发送到使用相同值的组件。

我需要向 api 发出请求,只要值是isProcessed == false它每半秒就会进行 10 次尝试。

如果在尝试期间失败,它只会再发出 3 个请求。

如果isProcessed == true,停止请购单。

所有逻辑都是内置的,但我无法输出最后一个可观察到的值。发送所有响应。

所有请求都是由 observables 发送的,无论是假的还是真的,但我只需要最后一个。

在此处输入图像描述

这里所有请求响应都到达组件,而不仅仅是最后一个

负责访问 API 的服务:

public getPositionConsolidate(): Observable<PosicaoConsolidada> {       
    return this.http.get<PosicaoConsolidada>(`${environment.api.basePosicaoConsolidada}/consolidado`)
      .pipe(
        map(res => res),
        retryWhen(genericRetryStrategy()),
        shareReplay(1),
        catchError(err => {
            console.log('Error in Position Consolidate', err);
            return throwError(err);
        })
    )
}

负责处理数据并将其发送到组件的服务:

public positionConsolidate() {
    let subject = new BehaviorSubject<any>([]);
    this.api.getPositionConsolidate().subscribe(response => {
        if(response.hasProcessado == false) {
            for (let numberRequest = 0; numberRequest < 10; numberRequest++) {
                setTimeout(() => {
                   //subject.next(this.api.getPosicaoConsolidada().subscribe());
                   this.api.getPositionConsolidate().subscribe(res => {
                        subject.next(res)
                   })
                }, numberRequest * 500, numberRequest);
            }
        } else {
            retryWhen(genericRetryStrategy()),
            finalize(() => this.loadingService.loadingOff())
        }   
    })
    return subject.asObservable()
}

在组件中:

public ngOnInit() {
 this.coreState.positionConsolidate().subscribe(res => console.log(res))
}

标签: javascriptangulartypescriptrxjs

解决方案


回答您的问题最简单的部分是,如果您只想从可观察到的最后一个发射,那么只需使用最后一个运算符。但是,您编写事物的方式使其难以合并。以下将您的代码重构为没有任何非 rxjs 控制结构的单个流。

public positionConsolidate() {

  return this.api.getPositionConsolidate().pipe(
    concatMap(res => iif(() => res.hasProcessado,
      of(res),
      interval(500).pipe(
        take(10),
        concatMap(() => this.api.getPositionConsolidate())
      )
    )),
    retryWhen(genericRetryStrategy()),
    finalize(() => this.loadingService.loadingOff()),
    last()
  );
}

发生了什么

  • 首先这会执行初始的 api 调用。
  • 然后根据结果,要么...
    • 返回初始结果
    • 每 500 毫秒调用 10 次 api。
  • 实现你所拥有的retryWhenfinalize
  • 返回最后发出的结果。

也不要订阅可观察对象内部的可观察对象 - 这就是concatMap等高阶可观察对象的用途。


推荐阅读