首页 > 解决方案 > 对基于自定义 ReplaySubject 的 observable 的同步阻塞订阅调用

问题描述

该视图包含以下元素:

<div *ngIf="showMe">Hello</div>

调用组件方法时:

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}

该元素显示在所有Progress记录器之前的视图中。这就是它应该的样子。这种基于 HTTP 的下载工作得很好。

但是在调用组件方法时:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    console.log('Progress: ' + download.progress);
  })
}

Progress该元素在所有记录器之后最后显示在视图中。这不是应该的样子。这个基于自定义ReplaySubject的 observable 没有按预期工作。实际上,该元素应该在所有Progress记录器之前而不是之后显示。

我想看看是否有一个订阅呼叫被阻塞。

所以我将这两种方法更改为:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}

downloadDemo(): void {
  this.download$ = this.downloadService.downloadUrlAsBlobWithProgressAndSaveInFile('assets/skypeforlinux-64.deb', 'demo')
  this.showMe = true;
  this.download$.subscribe((download: Download) => {
    console.log('Progress: ' + download.progress);
  });
  console.log('Call done');
}

以下是调用该downloadDemo()方法时的记录器:

Progress: 0
Call done
Progress: 0
Progress: 0
Progress: 2
Progress: 3

我们可以看到subscribe()调用是非阻塞的。

以下是调用该downloadSoundtrack()方法时的记录器:

Progress: 96
Progress: 97
Progress: 100
Call done

我们可以看到subscribe()呼叫被阻塞。

添加显式this.detectChanges();调用没有区别:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi(soundtrack);
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(progress$, fileName);
  this.download$.subscribe((download: Download) => {
    this.showMe = true;
    this.detectChanges();
    console.log('Progress: ' + download.progress);
  })
}

它仍然显示在所有Progress记录器之后。

我还尝试了一些显式订阅来代替*ngIf="download$ | async as download"模板中的,但它没有任何帮助:

downloadInProgress(soundtrack: Soundtrack): boolean {
  let inProgress: boolean = false;
  if (soundtrack.download) {
    if (soundtrack.download.progress > 0 && soundtrack.download.progress < 100) {
      inProgress = true;
    } else if (soundtrack.download.progress == 100) {
      console.log('complete');
      soundtrack.download = undefined;
    }
  }
  console.log('inProgress ' + inProgress);
  return inProgress;
}

长期运行的服务:

public progressiveCreateSoundtrackMidi(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  return Observable.create((progressTaskBis$: ReplaySubject<ProgressTask<Uint8Array>>) => {
    this.createSoundtrackMidi(soundtrack, progressTaskBis$);
    progressTaskBis$.complete();
    return { unsubscribe() { } };
  });
}

public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: ReplaySubject<ProgressTask<Uint8Array>>): Uint8Array {
  const midi: Midi = new Midi();
  midi.name = soundtrack.name;
  midi.header.name = soundtrack.name;
  let noteIndex: number = 0;
  if (soundtrack.hasTracks()) {
    soundtrack.tracks.forEach((track: Track) => {
      const midiTrack: any = midi.addTrack();
      midiTrack.name = track.name;
      midiTrack.channel = track.channel;
      if (track.hasMeasures()) {
        let totalDurationInSeconds: number = 0;
        for (const measure of track.getSortedMeasures()) {
          if (measure.placedChords) {
            if (!this.notationService.isOnlyEndOfTrackChords(measure.placedChords)) {
              for (const placedChord of measure.placedChords) {
                if (!this.notationService.isEndOfTrackPlacedChord(placedChord)) {
                  const duration: string = placedChord.renderDuration();
                  const durationInSeconds: number = Tone.Time(duration).toSeconds();
                  const velocity: number = placedChord.velocity;
                  // const tempoInMicroSecondsPerBeat: number = this.beatsToMicroSeconds(1, measure.getTempo());
                  // const ticks: number = this.beatsToTicks(durationInBeats, DEFAULT_MIDI_PPQ, tempoInMicroSecondsPerBeat);
                  for (const note of placedChord.notes) {
                    if (!this.notationService.isEndOfTrackNote(note)) {
                      if (progressTask$) {
                        this.commonService.sleep(50);
                        progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), noteIndex));
                      }
                      noteIndex++;
                      midiTrack.addNote({
                        midi: this.synthService.textToMidiNote(note.renderAbc()),
                        time: totalDurationInSeconds,
                        // ticks: ticks,
                        name: note.renderAbc(),
                        pitch: note.renderChroma(),
                        octave: note.renderOctave(),
                        velocity: velocity,
                        duration: durationInSeconds
                      });
                    }
                  }
                totalDurationInSeconds += durationInSeconds;
                }
              }
            }
          }
        }
      }
    });
  }
  if (progressTask$) {
    progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(soundtrack.getNbNotes(), soundtrack.getNbNotes(), midi.toArray()));
  }
  return midi.toArray();
}

有一个 50 毫秒的睡眠调用会减慢文件的创建速度,以便给一些充足的时间。

下载服务的实现基于本文

我在 Angular 9.1.0

标签: angularobservablerxjs-observables

解决方案


为了简单起见,我建议转向一种更具反应性的状态驱动方法,您可以在其中执行类似于以下操作的操作:

1.

soundtracks从数组更改为 apublic readonly soundtracks: Observable<Soundtrack[]> = this.soundtracksSubject.asObservable()以使您的 UI 能够注册更改。this.soundtracksSubject那么private readonly soundtracksSubject: BehaviorSubject<Soundtrack[]> = new BehaviorSubject([]);就是可以用来触发观察者的soundtracks刷新。当您收到 HTTP 响应时,您无需设置this.soundtracks = soundtracks;而是调用this.soundtracksSubject.next(soundtracks)).

此外,在进行实际下载 ( soundtrack.download = download;) 而不是仅在更改后更改模型时,您必须再次调用主题以将模型中的更改传播给侦听器:

const updatedSoundtracks: Soundtrack[] = this.soundtracksSubject.value.map(existingSoundtrack => existingSoundtrack);
const soundtrack: Soundtrack = updatedSoundtracks.find(existingSoundtrack => soundtrack.name === existingSoundtrack.name); // Or whatever identifies your soundtrack

if (soundtrack) {
    soundtrack.download = download;
    this.soundtracksSubject.next(updatedSoundtracks);
}

将您的 UI 从 更改<tr *ngFor="let soundtrack of soundtracks"><tr *ngFor="let soundtrack of soundtracks | async">以解决Observable在您的 Angular 组件中使用它的问题。这也意味着您的组件将注册更改soundtracks并在有人调用主题/可观察对象时收到通知。

Observable并且BehaviorSubject都是 RxJS 概念 ( import {BehaviorSubject, Observable} from "rxjs/index";),值得研究,因为它们让你的生活变得如此轻松。


推荐阅读