首页 > 解决方案 > 有条件地使用switchmap或exhaustmap angular

问题描述

抱歉,如果此问题已得到解答,但我自己搜索过并找不到。

我想要做的是调用一个从 2 个位置中的 1 个返回数据的函数。要么获取缓存数据,要么从 API 获取新数据。我正在使用 Angular 7 并且一直在尝试实现一个有条件地使用 switchMap 和排气映射的 observable。我对 Observables 有很好的理解,但我知道还有一些我不明白的地方。我正在考虑使用排气映射作为从缓存中提取的默认方法。但是如果我需要从 API 获取最新数据,我想使用 switchMap 来忽略从缓存中检索并获取新数据。

我找到了一个 stackblitz 操场并将其分叉以进行一些测试。这是我的尝试https://stackblitz.com/edit/angular7-conditional-data

这是相关的代码。

this.observable.pipe(
      map((val: string) => {
        if (/^true/.test(val)) {
          this.mergeVal = 'switch ' + val;
          return switchMap(val => {
            // console.log("SwitchMap val: ", val);
            this.switchVal = val;
            return of(val);
          })
        } else {
          this.mergeVal = 'exhaust ' + val;
          return exhaustMap(val => {
            // console.log("resolve val: ", val);
            return new Promise(resolve => {
              setTimeout(() => {
                this.exhaustVal = val;
                resolve(val);
              }, 3000);
            });
          })
        }
      })
    ).subscribe(
      data => {
      //  this.observable; 
      this.subscribeVal = 'data: ' + data;
      },
      err => {
        this.subscribeVal = 'error';
      },
      () => {
        this.subscribeVal = 'complete';
      }
    );
  }

更新 2020-06-02 我添加了一项服务,该服务可以进行 api 调用,以更好地与我将要应用的当前设置保持一致。离开了exhaustMap和switchMap,希望能在此处发帖的人的帮助下解决同样的问题

现在我认为我遇到的问题出在这部分代码中,但我不知道如何解决它

  dummyAPI = 'https://swapi.dev/api/';

  cacheSubject: Subject<any> = new Subject();
  cacheObservable: Observable<any> = this.cacheSubject.asObservable().pipe(
    startWith(this.http.get(this.dummyAPI).pipe(
      map(val => {
        console.log('startWith: ', val);
        return this.now('dataService', true);
      })
    )),
    concatMap(val => val),
    shareReplay({refCount: true})
  );

  constructor(private http: HttpClient) {}

  now(fromId, val = false) {
    return `${val} ${moment().format('hh:mm:ss')} COMP ${fromId}`;
  }

  updateDataFromAPI(fromId, getFromAPI) {
    const val = this.now(fromId, getFromAPI);
    if (getFromAPI) {
      this.cacheSubject.next(
        this.http.get(this.dummyAPI).pipe(
          tap(val => {
            console.log("tap dummyAPI before delay: ", val);
          }),
          delay(5000),
          tap(val => {
            console.log("tap dummyAPI after delay: ", val);
          }),
        )
      );
    } else {
      this.cacheSubject.next(of(val));
    }
  }

  getData(fromId, getFromAPI?) {
    if (getFromAPI) {
      this.updateDataFromAPI(fromId, getFromAPI);
    }
    return this.cacheObservable.pipe(take(1));
  }

我遇到的问题是 cacheObservable 订阅(来自 getData)没有等待 cacheSubject 值(通过 updateDataFromAPI 调用 next)在 cacheObservable 完成之前用 api 数据响应。

标签: angulartypescriptrxjs

解决方案


这行不通。因为您想根据this.observable.

的想法exhaustMap是跳过所有的发射,this.observable直到子流exhaustMap完成。

在您的情况下,我建议使用shareReplay(1)它将继续提供缓存,直到发出来自this.observable.

const cachedStream$ = this.observable.pipe(
  startWith(null), // to trigger the first load.
  switchMap(() => this.http.get('http://my-data.url')),
  shareReplay(1),
);

cachedStream$.subscribe(cached => this.cached1 = cached);
cachedStream$.subscribe(cached => this.cached2 = cached);
cachedStream$.subscribe(cached => this.cached3 = cached);

this.observable.next(); // refresh the value from the backend.

推荐阅读