首页 > 解决方案 > 如何合并使用 FROM 创建的 observables 数组的所有映射

问题描述

我正在使用 FROM 运算符创建一个可观察对象数组,每个可观察对象都使用 margeMap 进行转换。首先,使用此 uploadService.addFile 将项目添加到存储库,然后如果未将 File 标记为已上传,则使用 uploadService.uploadFile 进行上传,此服务会发出上传进度,因此我添加了一个过滤器以仅在文件时发出已上传或未上传

我想要做的是只有一个发射表明所有文件都已上传

我尝试使用 forkJoin,但使用该运算符我没有任何发射,我认为是因为在某些时候我需要指示操作在最后一个管道内完成,但我不知道如何执行此操作。

/*this.therapyFiles this is an array of elements*/
from(this.therapyFiles).pipe(
      mergeMap(therapyFile => this.fileParser.parseTherapyEndTime(therapyFile.file).pipe(
        map(date => {
          therapyFile.endTime = date;
          return therapyFile;
        })
      )),
      mergeMap(therapyFile =>
        this.uploadService.addFile(
          this.facilityId,
          therapyFile.idDevice,
          therapyFile.file.name,
          therapyFile.endTime
        ).pipe(
          switchMap(file => file.isUploaded ?
            of(this.AlreadyUploaded) :
            this.uploadService.uploadFile(therapyFile.file)),
          map(progress => {
            therapyFile.uploadProgress = progress;
            return therapyFile;
          }),
          filter(uploadedFile => uploadedFile.uploadProgress > 99 || uploadedFile.uploadProgress < 0)
        )
      )
    ).subscribe(z => console.log(z)); // here I get all emition for every item in therapyFiles
/*this.therapyFiles this is an array of elements*/
forkJoin(from(this.therapyFiles).pipe(
      mergeMap(therapyFile => this.fileParser.parseTherapyEndTime(therapyFile.file).pipe(
        map(date => {
          therapyFile.endTime = date;
          return therapyFile;
        })
      )),
      mergeMap(therapyFile =>
        this.uploadService.addFile(
          this.facilityId,
          therapyFile.idDevice,
          therapyFile.file.name,
          therapyFile.endTime
        ).pipe(
          switchMap(file => file.isUploaded ?
            of(this.AlreadyUploaded) :
            this.uploadService.uploadFile(therapyFile.file)),
          map(progress => {
            therapyFile.uploadProgress = progress;
            return therapyFile;
          }),
          filter(uploadedFile => uploadedFile.uploadProgress > 99 || uploadedFile.uploadProgress < 0)
        )
      )
    )).subscribe(z => console.log(z)); // I tried in this way but never get an emition, but all inside code of forkjoin works as expected

我希望只有一个发射表明所有文件都已上传

标签: angularrxjs

解决方案


使用 concatAll

   of(this.therapyFiles).pipe(
      mergeMap(therapyFile => this.fileParser.parseTherapyEndTime(therapyFile.file).pipe(
        map(date => {
          therapyFile.endTime = date;
          return therapyFile;
        })
      )),
      mergeMap(therapyFile =>
        this.uploadService.addFile(
          this.facilityId,
          therapyFile.idDevice,
          therapyFile.file.name,
          therapyFile.endTime
        ).pipe(
          switchMap(file => file.isUploaded ?
            of(this.AlreadyUploaded) :
            this.uploadService.uploadFile(therapyFile.file)),
          map(progress => {
            therapyFile.uploadProgress = progress;
            return therapyFile;
          }),
          filter(uploadedFile => uploadedFile.uploadProgress > 99 || uploadedFile.uploadProgress < 0)
        )
      ),
      concatAll()
    ).subscribe(z => console.log(z)); // I tried in

推荐阅读