首页 > 解决方案 > 使用 RXJS 继续以角度触发 http 调用,直到满足条件

问题描述

我正在调用 spotify api,它返回一个像这样的对象:

{
  next: 'https://api.spotify.com/v1/me/tracks?offset=100&limit=50'
  items: [...]
}

其中 items 是刚刚进行的网络调用的结果, next 是用于获取下一个“页面”结果的 url。我想做的是在没有偏移的情况下进行初始调用,然后继续进行网络调用,直到下一个为空,这意味着用户没有更多的项目可以获取。如果没有更多项目,他们将返回 null 以供下一个。

这似乎是可能的,但似乎无法弄清楚如何正确地做到这一点。我正在寻找的是这样的:

  readonly baseSpotifyUrl: string = 'https://api.spotify.com/v1';

constructor(private http: HttpClient) {}


public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
  }

public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    let next: string = '';
    this.getTracks(next)
      .subscribe({
        next: (res: any): void => {
          tracks$.next(res.items);
          next = res.next;
          // if res.next !== null, do this again now that we have set next to res.next
        },
        error: (err: any): void => {
          console.error(err);
        }
      });
    return tracks$;
  }

这里的想法是,我的组件将调用 getAllTracks() 并接收一个主题,然后将不断推送新项目通过该主题,直到检索到所有项目。我似乎无法弄清楚如何在前一个请求返回时仅在有更多项目要获取的情况下发出新的网络请求(res.next!== null)

编辑 - - - - - - - - - - - - - - - - - - - - - - - - - ----------

这完成了工作,但我觉得它是垃圾:

  public getAllTracksSegment(itemsSubject: Subject<any>, nextSubject: Subject<string>, url?: string): void {
    this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`).subscribe({
      next: (res: any): void => {
        itemsSubject.next(res.items);
        nextSubject.next(res.next);
      }
    });
  }

  public getAllTracks(): Observable<any> {
    const tracks$: Subject<any> = new Subject();
    const next$: Subject<string> = new Subject();
    next$.subscribe({
      next: (next: any): void => {
        if (next !== null) {
          this.getAllTracksSegment(tracks$, next$, next);
        }
      }
    });
    next$.next('');
    return tracks$;
  }

标签: angulartypescriptasynchronousrxjsobservable

解决方案


如果我理解正确的问题,我会使用expand操作员来构建解决方案。

这是我将使用的代码。评论是内联的

public getTracks(url?: string): Observable<any> {
    return this.http.get(url && url?.length > 0 ? url : `${this.baseSpotifyUrl}/me/tracks?limit=50`);
}

public getAllTracks(): Observable<any[]> {
  // the first call is with no parameter so that the default url with no offset is used
  return getTracks().pipe(
     // expand is used to call recursively getTracks until next is null
     expand(data => data.next === null ? EMPTY : getTracks(data.next)),
     // with tap you can see the result returned by each call
     tap(data => console.log(data)),
     // if you want you can use the reduce operator to eventually emit the 
     // accumulated array with all items
     reduce((acc, val) => {
       acc = [...acc, ...val.items]
       return acc
     }, [])
  )
}

// now you can fire the execution of the recursive calls by subscribing
// to the observable returned by getAllTracks
getAllTracks().subscribe(
   // allItems is an array containing all the items returned by the various calls
   allItems => console.log(allItems)
)

@skyleguy 评论后的补充说明

运算符tap用于实现副作用。换句话说,它从上游接收所有通知,对通知的数据做任何需要做的事情,然后将相同的通知传递给下游。无需从传递给tap操作员的函数中返回任何内容。在应用副作用后,上游只是通过下游。在此示例中,副作用只是在控制台上打印与通知一起传递的数据。

reduce使用的pipereduceRxJs 的运算符,而不是 的reduce方法Array。RxJsreduce算子会累积所有从上游通知的数据,并且在上游s时只发出一个值。complete所以,在这个例子中,每次调用远程函数返回一些东西时,这个东西就会进入reduce运算符并有助于累加逻辑。当expand返回EMPTYObservable 时,在递归结束时,EMPTYObservable 只是completes 而不通知任何东西,这意味着上游completes因此reduce可以发出它的第一个也是唯一的通知,即所有项目累积的数组,然后是complete

这个 stackblitz通过模拟远程调用复制了这个逻辑。


推荐阅读