首页 > 解决方案 > 用于下载媒体文件和调度进度的 NgRX 效果;如何处理流和节流

问题描述

我在理解和为我的剧集下载选项应用@Effects 方面有些挣扎。我在另一个问题上得到了一些帮助,这是我最新的混合物。

快速概览:用户单击下载会调度DOWNLOAD_EPISODE在第一个效果中捕获的动作。下载调用返回一个 HttpEvent 流和一个最终的 HttpResponse。

期间event.type === 3我要报告下载进度。当event.type === 4身体到达时,我可以称之为成功,例如可以创建一个 Blob。

服务episodesService

download( episode: Episode ): Observable<HttpEvent<any>> | Observable<HttpResponse<any>> {

  // const url = encodeURIComponent( episode.url.url );
  const url = 'https%3A%2F%2Fwww.sample-videos.com%2Faudio%2Fmp3%2Fcrowd-cheering.mp3';
  const req = new HttpRequest( 'GET', 'http://localhost:3000/episodes/' + url, {
    reportProgress: true,
    responseType: 'blob'
  } );
  return this.http.request( req );
}

downloadSuccess( response: any ): Observable<any> {
  console.log( 'calling download success', response );
  if ( response.body ) {
    var blob = new Blob( [ response.body ], { type: response.body.type } );
    console.log( 'blob', blob );
  }
  return of( { status: 'done' } );
}

getHttpProgress( event: HttpEvent<any> | HttpResponse<Blob> ): Observable<DownloadProgress> {

  switch ( event.type ) {
    case HttpEventType.DownloadProgress:
      const progress = Math.round( 100 * event.loaded / event.total );
      return of( { ...event, progress } );

    case HttpEventType.Response:
      const { body, type } = event;
      return of( { body, type, progress: 100 } );

    default:
      return of( { ...event, progress: 0 } );
  }
}

效果:

@Effect()
downloadEpisode$ = this.actions$.pipe(
  ofType<episodeActions.DownloadEpisodes>( episodeActions.DOWNLOAD_EPISODE ),
  switchMap( ( { payload } ) => this.episodesService.download( payload )
    .pipe(
      switchMap( (response: HttpEvent<any> | HttpResponse<any>) => this.episodesService.getHttpProgress( response ) ), //merge in the progress
      map( ( response: fromServices.DownloadProgress ) => {

        // update the progress in the episode
        //
        if ( response.type <= 3 ) {
          return new episodeActions.DownloadProgressEpisodes( { ...payload, download: {
            progress: response.progress
          } }  );

        // pass the Blob on the download response
        //
        } else if ( response.type === 4 ){
          return new episodeActions.DownloadEpisodesSuccess( response );
        }
      } ),
      catchError( error => of( new episodeActions.DownloadEpisodesFail( error ) ) ),
    )
  )
)

@Effect( { dispatch: false } )
processDownloadEpisodeSuccess$ = this.actions$.pipe(
  ofType<any>( episodeActions.DOWNLOAD_EPISODE_SUCCESS ),
  switchMap( ( { payload } ) => this.episodesService
    .downloadSuccess( payload ).pipe(
      tap( response => console.log( 'response', payload,response ) ),

      //  catchError(err => of(new episodeActions.ProcessEpisodesFail(error))),
    )
  )
)

我对这个解决方案非常满意,但是我不喜欢 MAP 中的 If ELSE 子句作为 DOWNLOAD_EPISODE 效果的一部分。

理想情况下,我想在那里拆分流,如果类型为 3,我想去路由 A,它总是episodeActions.DownloadProgressEpisodes使用 Episode 有效负载进行调度。每当它是类型 4(最后一个发出的事件)时,我想在流中获取 B 路由并episodeActions.DownloadEpisodesSuccess使用响应主体调用。

我尝试添加更多效果,但我总是遇到这样的情况:结果流this.episodesService.download要么是进度更新,要么是响应体。对于进度更新,我要求 Episode 记录能够调用 reducer 并更新 Episode 的进度。

我尝试将事物用作设置 filter( response => response.type === 4 )之后,DownloadProgressEpisodes但在DownloadEpisodesSuccess希望它允许在过滤器之前进行迭代以处理进度,然后将其余部分过滤到成功操作。然后我了解到这不是流的工作方式。

我也尝试last()了 which 确实有效,但它没有让我分派响应操作(控制台日志确实有效),但只会last()分派之后的 DownloadEpisodesSuccess 操作。

因此,如果可以拆分流并以event.type 3不同方式处理,那么event.type 4我会对此感兴趣。

*****更新*******

我想保留最初的问题,但是我确实遇到了限制要求,因为我为大文件调度了很多操作。我尝试了throttle操作符,但这会抑制发射,但也可以抑制响应主体的最后一个发射。我尝试将其拆分,partition但我认为这是不可能的。在一个灯泡时刻之后,我决定创建 2 个效果DOWNLOAD_EPISODE,一个只捕获所有event.type === 3并限制它,另一个效果使用last操作符并且只处理event.type === 4

见代码:

  @Effect( )
  downloadEpisode$ = this.actions$.pipe(
    ofType<episodeActions.DownloadEpisodes>( episodeActions.DOWNLOAD_EPISODE ),
    switchMap( ( { payload } ) => this.episodesService.download( payload )
      .pipe(
        switchMap( (response: HttpEvent<any> | HttpResponse<any>) => this.episodesService.getHttpProgress( response ) ),
        throttleTime( 500 ),
        map( ( response: fromServices.DownloadProgress ) => {
          console.log('Type 3', response);
          // update the progress in the episode
          if ( response.type <= 3) {
            return new episodeActions.DownloadProgressEpisodes( { ...payload, download: {
              progress: response.progress
            } }  );
          }
        } ),
        catchError( error => of( new episodeActions.DownloadEpisodesFail( error ) ) ),
      )
    )
  )

  @Effect( )
  downloadEpisodeLast$ = this.actions$.pipe(
    ofType<episodeActions.DownloadEpisodes>( episodeActions.DOWNLOAD_EPISODE ),
    switchMap( ( { payload } ) => this.episodesService.download( payload )
      .pipe(
        switchMap( (response: HttpEvent<any> | HttpResponse<any>) => this.episodesService.getHttpProgress( response ) ),
        last(),
        map( ( response: fromServices.DownloadProgress ) => {
          console.log('Type 4', response);
          if ( response.type === 4 ){
            return new episodeActions.DownloadEpisodesSuccess( response, { ...payload, download: {
              progress: response.progress
            } } );
          }
        } ),

        catchError( error => of( new episodeActions.DownloadEpisodesFail( error ) ) ),
      )
    )
  )

您认为这是最好的解决方案吗?类型 3 和 4 是分开的,我可以限制它。

*update2:它确实会产生一个问题,即在进度为 100%Progress Action的操作之后可以触发进度为 97 %。Download Success每次遇到新的挑战...

SampleTime(500)似乎有效,因为在类型 4 事件进来之后它似乎没有抛出类型 3 事件。

*update3:我刚刚发现我现在调用Download了两次效果,叹息。我回到第一方,试图限制来自 episodeService.download 的 HttpEvent 流。

标签: angular6ngrxngrx-effects

解决方案


我认为如果你不想有if else声明,你将不得不创造不同的效果。

在当前的一个中,您将调度一个新的动作,例如DownloadEpisodeProgess,使用有效负载中的类型。然后,您将创建两个侦听此操作的效果并通过类型相应地过滤它们,一个效果将用于 dispatch DownloadProgressEpisodes,另一个用于DownloadEpisodesSuccess.

另一种可能的解决方案是partition operator,但我没有将它与 NgRx Effects 结合使用。

请记住,对于您进行的每个订阅,都会产生性能成本,我个人并不介意这种if else说法。


推荐阅读