javascript - 如何从 RxJS combineLatest 中删除瞬态事件与 NGRX 存储源 observable
问题描述
我正在使用 TypeScript、Angular、NGRX 应用程序。我一直在编写状态可观察对象而不使用选择器——主要原因是我发现它们不如直接使用 RxJS 运算符强大。例如,不能单独使用选择器来限制事件的发射——而是必须使用过滤运算符。
在大多数情况下,用 observables 替换选择器没有任何问题 - observables 可以以与选择器相同的方式组合 - 除了一个例外:我无法弄清楚如何组合可能由同一操作触发的 observables。通常,我使用 combineLatest 作为我的 goto observable composer;然而,如果两个 observables 更新同一个 action,就会有一个短暂的更新,其中一个 observables 具有来自新状态的值,而另一个具有来自前一个状态的值。
最初,我考虑改用 zip observable creator;然而,虽然这解决了两个可观察对象一起更新时的问题,但当一个可观察对象在没有另一个可观察对象的情况下更新时,它并不能解决问题——这在 NGRX 架构中是完全可能的。
然后我考虑了 auditTime(0) 运算符,它确实解决了删除瞬态更新的问题,但有新的问题 1) 它导致 observables 在稍后的事件循环中发出,这打破了应用程序内部的一些假设(可解决,但很烦人) 2) 它使各种可观察对象尽快发射,而我希望所有可观察对象在同一个存储脉冲上一起发射。从图形上看,这意味着应用程序的不同部分的渲染是交错的,而不是在同一帧上一起绘制(请注意,我们的应用程序数据量很大,并且经常需要在存储脉冲上丢帧)
最后,我编写了一个自定义运算符来组合来自同一来源的 observables
export type ObservableTuple<TupleT extends any[]> = {
[K in keyof TupleT]: Observable<TupleT[K]>;
};
export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
return (source$: Observable<SourceT>) => source$.pipe(
withLatestFrom(combineLatest<TupleT>(inputs)),
map(([, values]) => values),
);
}
以下是 TypeScript 中问题的总结(使用 NGRX、RxJS 和 Angular 的片段)
interface IState {
foo: string;
bar: string;
}
@Injectable()
class SomeService {
constructor(store$: Store<IState>) {
}
readonly foo$ = this.store$.pipe(select(state => state.foo));
readonly bar$ = this.store$.pipe(select(state => state.bar));
readonly composed$ = this.store$.pipe(
selectFrom(
this.foo$,
this.bar$,
),
map(([foo, bar]) => `${foo} - ${bar}`),
);
}
const UPDATE_FOO = {
type: 'update foo',
foo: 'some updated value for foo'
};
const UPDATE_BAR = {
type: 'update bar',
bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
type: 'update both',
both: 'some updated value for both foo and bar',
};
即使 selectFrom 调用相互嵌套,这也能正常工作,例如
readonly composed2$ = this.store$.pipe(
selectFrom(
this.composed$,
this.foo$
)
)
只要在composed2$之前定义commodated$,一切正常;但是,我没有考虑的一个情况是在组合 $ 和组合 2 $ 之间使用 switchMap 之类的运算符时。在这种情况下,因为 compsed2$ 被 switchMap 销毁并重新创建,commodated2$ 可能在composed$ 之前触发,这会导致一切不同步
解决方案
对于您尝试组合 2 个可观察对象并且仅在它们都完成发射后才发射的特定问题,您可以尝试利用:
- queue Scheduler - 让您推迟递归调用,直到当前调用完成
- debounce - 延迟更新直到信号到达
- observeOn - 只监听队列调度器上的存储更新
然后您可以执行以下操作:
readonly queuedStore$ = this.store$.pipe(
observeOn(queue), // use queue scheduler to listen to store updates
share() // use the same store event to update all of our selectors
);
// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));
// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
debounce(() => empty().pipe(observeOn(queue))));
会发生什么?
富更新
- queuedStore$ 安排通知
queue
- 通知立即开始,因为当前没有任何运行
foo$
通知combineLatest
通知debounce
订阅 durationSelector- durationSelector 安排通知
queue
- 未发送通知,因为排队的操作当前正在运行
- 调用堆栈展开到第 1 步
- 队列调度程序运行 durationSelector 通知
- debounce 触发并向 UI 发送更新
酒吧更新
与 Foo 更新相同
BarFoo 更新
- queuedStore$ 安排通知
queue
- 通知立即开始,因为当前没有任何运行
foo$
通知combineLatest
通知debounce
订阅 durationSelector- durationSelector 安排通知
queue
- 未发送通知,因为排队的操作当前正在运行
- 调用堆栈展开到第 3 步
bar$
通知combineLatest
通知debounce
从 foo 通知中丢弃先前的值debounce
重新订阅durationSelector
- durationSelector 安排通知
queue
- 未发送通知,因为排队的操作当前正在运行
- 调用堆栈展开到第 1 步
- 队列调度程序运行 durationSelector 通知
- debounce 触发并向 UI 发送更新
从理论上讲,这会让您得到您想要的行为: - 单个更新立即应用(下一个滴答之前) - 组合更新立即应用(下一个滴答之前) - 组合更新忽略中间结果 - 如果您的组合 observable 使用switch
.
需要注意的事情
如果您在调度程序上处理其中一个通知时调度另一个事件queue
,则该第二个事件的通知将推迟到当前处理程序完成之后。
推荐阅读
- amazon-web-services - 即使在添加 ICMP 入站规则后也无法 PING EC2 实例
- python - 使用 sqlalchemy 从 Python 连接 MySQL,mysql+mysqldb 适用于 Windows+Ubuntu 但不适用于 RHEL
- php - sitemap_index.xml 正在重定向到 404 页面
- swift - Xcode 问题 AppDelegate 与 Firebase - 'NSPersistentCloudKitContainer'
- contiki-process - 如何在节点之间共享数据
- linux - 如何处理一行bash命令中的多个引号?
- scala - Spark / Scala - RDD填充最后一个非空值
- sql - 想要在使用聚合函数时添加第三列,不属于 group by
- html - Django 并排显示 2 个表格
- google-ads-script - 将所有广告系列映射到其 AW 完整路径:MCC/SubMCC/account/campaign