首页 > 解决方案 > 检测在订阅块中更新的 BehaviorSubject 组件成员变量的变化?

问题描述

我在组件视图中有一个元素:

<div>Progress: {{ progress$ | async }}</div>

next()它应该通过块中的调用以连续的方式更新subscribe()

progress$: BehaviorSubject<number> = new BehaviorSubject(0);
downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi$(soundtrack);
  console.log('Created the observable');
  const piper$: Observable<ProgressTask<Uint8Array>> = progress$.pipe(
    tap((progressTask: ProgressTask<Uint8Array>) => {
      this.progress$.next(progressTask.loaded);
      console.log('Loaded: ' + progressTask.loaded);
    })
  );
  this.download$ = this.downloadService.downloadObservableDataAsBlobWithProgressAndSaveInFile(piper$, fileName);
  console.log('Controller method call complete');
}

运行它时,控制台日志显示:

Created the observable
Controller method call complete
Loaded: 0
Loaded: 1
Loaded: 2
...
Loaded: 591

但是,视图中的元素并没有持续更新,而只是在最后一个值处,从 0 直接变为 591。

我尝试this.detectChanges();在通话后添加一个,next()但没有帮助。

我尝试将调用包装next()在一个ngZone块中,但没有帮助。

sleep方法实现为:

public sleep(milliseconds: number): void {
  const date = Date.now();
  let currentDate = null;
  do {
    currentDate = Date.now();
  } while (currentDate - date < milliseconds);
}

在 Angular 11 上。

更新:我创建了另一个组件方法来触发一个interval()可观察的并且有效。它确实显示了progress模板中的值不断变化。

testProgress(): void {
  console.log('Called the testProgress method');
  interval(1000)
  .subscribe((value: number) => {
    this.progressSubject$.next(value);
    this.detectChanges();
    console.log('The progress: ' + value);
  });
}

更新:我还尝试订阅第一个 observable,如下所示:

downloadSoundtrack(soundtrack: Soundtrack): void {
  const fileName: string = soundtrack.name + '.' + MIDI_FILE_SUFFIX;
  const progress$: Observable<ProgressTask<Uint8Array>> = this.midiService.progressiveCreateSoundtrackMidi$(soundtrack);
  console.log('Created the observable');
  const piper$: Observable<ProgressTask<Uint8Array>> = progress$
  .subscribe((progressTask: ProgressTask<Uint8Array>) => {
    this.progressSubject$.next(progressTask.loaded);
    this.detectChanges();
    console.log('Loaded: ' + progressTask.loaded);
  });
  console.log('Controller method call complete');
}

但是模板中的进度在最后一个值之前仍然没有更新。

更新:这次我试图改变服务类产生进度的方式。而不是通过下载来完成,如下所示:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
   return new Observable((observer$ : Subscriber<ProgressTask<Uint8Array>>) => {
     this.createSoundtrackMidi(soundtrack, observer$);
     return { unsubscribe() { } };
   });
}

我伪造了它:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  return interval(1000)
  .pipe(
    map((value: number) => {
      return this.downloadService.createProgressTask<Uint8Array>(1000, value);
    })
  );
}

现在,随着进度不断变化并反映在模板中,这可以正常工作。

我的createSoundtrackMidi服务方法有些可疑:

public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: Subscriber<ProgressTask<Uint8Array>>): Uint8Array;

所以我伪造了上面的服务方法:

public createSoundtrackMidi(soundtrack: Soundtrack, progressTask$?: Subscriber<ProgressTask<Uint8Array>>): Uint8Array {
  const midi: Midi = new Midi();
  if (progressTask$){
    for (let index: number = 0; index < 1000; index++) {
      this.commonService.sleep(10);
      progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(1000, index));
    }
    progressTask$.next(this.downloadService.createProgressTask<Uint8Array>(1000, 1000));
    progressTask$.complete();
  }
  return midi.toArray();
}

问题又出现了。

要查看方法参数progressTask$?: Subscriber<ProgressTask<Uint8Array>>是否仅更新本地副本,我伪造了以下服务方法:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  return new Observable((observer$ : Subscriber<ProgressTask<Uint8Array>>) => {
    for (let index: number = 0; index < 1000; index++) {
      this.commonService.sleep(10);
      observer$.next(this.downloadService.createProgressTask<Uint8Array>(1000, index));
    }
    observer$.next(this.downloadService.createProgressTask<Uint8Array>(1000, 1000));
    observer$.complete();
    return { unsubscribe() { } };
  });
}

问题出现了。方法调用不是问题的原因。

interval使用我自己的实现时也没有显示该问题:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  const max: number = 200;
  return this.interval(max)
  .pipe(
    map((value: number) => {
      return this.downloadService.createProgressTask<Uint8Array>(max, value);
    })
  );
}

private interval(period: number): Observable<number> {
  return new Observable((observer$: Subscriber<number>) => {
    let i = 0;
    const handler = setInterval(() => observer$.next(i++), period);
    return () => clearInterval(handler);
  });
}

是因为setInterval()函数在每次迭代之间都没有阻塞吗?

该问题不会显示在:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  const max: number = 200;

  return new Observable((observer$: Subscriber<ProgressTask<Uint8Array>>) => {
    let index = 0;
    const handler = setInterval(() => {
      observer$.next(this.downloadService.createProgressTask<Uint8Array>(max, index));
      index++;
    }, max);
    return () => clearInterval(handler);
  });
}

setTimeout()但是函数调用中的循环显示了问题:

public progressiveCreateSoundtrackMidi$(soundtrack: Soundtrack): Observable<ProgressTask<Uint8Array>> {
  const max: number = 200;

  return new Observable((observer$: Subscriber<ProgressTask<Uint8Array>>) => {
    const handler = setTimeout(() => {
      for (let index: number = 0; index < max; index++) {
        this.commonService.sleep(10);
        observer$.next(this.downloadService.createProgressTask<Uint8Array>(max, index));
      }
      observer$.next(this.downloadService.createProgressTask<Uint8Array>(max, max));
      observer$.complete();
    }, max);
    return () => clearTimeout(handler);
  });
}

setInterval()如何以与函数相同的方式在每次循环迭代之间解除阻塞?

标签: angularrxjsrxjs-observables

解决方案


推荐阅读