首页 > 解决方案 > 如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable

问题描述

我正在使用 TypeScript、Angular、NGRX 应用程序。我一直在编写状态可观察对象而不使用选择器——主要原因是我发现它们不如直接使用 RxJS 运算符强大。例如,不能单独使用选择器来限制事件的发射——而是必须使用过滤运算符。

在大多数情况下,用 observables 替换选择器没有任何问题 - observables 可以以与选择器相同的方式组合 - 除了一个例外:我无法弄清楚如何组合可能由同一操作触发的 observables。通常,我使用 combineLatest 作为我的 goto observable composer;然而,如果两个 observables 更新同一个 action,就会有一个短暂的更新,其中一个 observables 具有来自新状态的值,而另一个具有来自前一个状态的值。

最初,我考虑改用 zip observable creator;然而,虽然这解决了两个可观察对象一起更新时的问题,但当一个可观察对象在没有另一个可观察对象的情况下更新时,它并不能解决问题——这在 NGRX 架构中是完全可能的。

然后我考虑了 auditTime(0) 运算符,它确实解决了删除瞬态更新的问题,但有新的问题 1) 它导致 observables 在稍后的事件循环中发出,这打破了应用程序内部的一些假设(可解决,但很烦人) 2) 它使各种可观察对象尽快发射,而我希望所有可观察对象在同一个存储脉冲上一起发射。从图形上看,这意味着应用程序的不同部分的渲染是交错的,而不是在同一帧上一起绘制(请注意,我们的应用程序数据量很大,并且经常需要在存储脉冲上丢帧)

最后,我编写了一个自定义运算符来组合来自同一来源的 observables

export type ObservableTuple<TupleT extends any[]> = {
  [K in keyof TupleT]: Observable<TupleT[K]>;
};

export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
  return (source$: Observable<SourceT>) => source$.pipe(
    withLatestFrom(combineLatest<TupleT>(inputs)),
    map(([, values]) => values),
  );
}

以下是 TypeScript 中问题的总结(使用 NGRX、RxJS 和 Angular 的片段)

interface IState {
    foo: string;
    bar: string;
}

@Injectable()
class SomeService {
    constructor(store$: Store<IState>) {
    }

    readonly foo$ = this.store$.pipe(select(state => state.foo));
    readonly bar$ = this.store$.pipe(select(state => state.bar));

    readonly composed$ = this.store$.pipe(
        selectFrom(
            this.foo$,
            this.bar$,
        ),
        map(([foo, bar]) => `${foo} - ${bar}`),
    );
}

const UPDATE_FOO = {
    type: 'update foo',
    foo: 'some updated value for foo'
};
const UPDATE_BAR = {
    type: 'update bar',
    bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
    type: 'update both',
    both: 'some updated value for both foo and bar',
};

即使 selectFrom 调用相互嵌套,这也能正常工作,例如

readonly composed2$ = this.store$.pipe(
   selectFrom(
      this.composed$,
      this.foo$
   )
)

只要在composed2$之前定义commodated$,一切正常;但是,我没有考虑的一个情况是在组合 $ 和组合 2 $ 之间使用 switchMap 之类的运算符时。在这种情况下,因为 compsed2$ 被 switchMap 销毁并重新创建,commodated2$ 可能在composed$ 之前触发,这会导致一切不同步

标签: javascriptrxjsngrx

解决方案


对于您尝试组合 2 个可观察对象并且仅在它们都完成发射后才发射的特定问题,您可以尝试利用:

  • queue Scheduler - 让您推迟递归调用,直到当前调用完成
  • debounce - 延迟更新直到信号到达
  • observeOn - 只监听队列调度器上的存储更新

然后您可以执行以下操作:

readonly queuedStore$ = this.store$.pipe(
    observeOn(queue), // use queue scheduler to listen to store updates
    share() // use the same store event to update all of our selectors
  );

// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));

// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
  debounce(() => empty().pipe(observeOn(queue))));

会发生什么?

富更新

  1. queuedStore$ 安排通知queue
  2. 通知立即开始,因为当前没有任何运行
  3. foo$通知
  4. combineLatest通知
  5. debounce订阅 durationSelector
  6. durationSelector 安排通知queue
  7. 未发送通知,因为排队的操作当前正在运行
  8. 调用堆栈展开到第 1 步
  9. 队列调度程序运行 durationSelector 通知
  10. debounce 触发并向 UI 发送更新

酒吧更新

与 Foo 更新相同

BarFoo 更新

  1. queuedStore$ 安排通知queue
  2. 通知立即开始,因为当前没有任何运行
  3. foo$通知
  4. combineLatest通知
  5. debounce订阅 durationSelector
  6. durationSelector 安排通知queue
  7. 未发送通知,因为排队的操作当前正在运行
  8. 调用堆栈展开到第 3 步
  9. bar$通知
  10. combineLatest通知
  11. debounce从 foo 通知中丢弃先前的值
  12. debounce重新订阅durationSelector
  13. durationSelector 安排通知queue
  14. 未发送通知,因为排队的操作当前正在运行
  15. 调用堆栈展开到第 1 步
  16. 队列调度程序运行 durationSelector 通知
  17. debounce 触发并向 UI 发送更新

从理论上讲,这会让您得到您想要的行为: - 单个更新立即应用(下一个滴答之前) - 组合更新立即应用(下一个滴答之前) - 组合更新忽略中间结果 - 如果您的组合 observable 使用switch.

需要注意的事情

如果您在调度程序上处理其中一个通知时调度另一个事件queue,则该第二个事件的通知将推迟到当前处理程序完成之后。


推荐阅读