首页 > 解决方案 > RxJS - 初始状态和更新

问题描述

我需要从 websocket 获取数据,并且我想使用 RxJS 来做到这一点。

有一个用于最新初始数据(约 1000 条记录)的 websocket 1和用于增量更新的websocket 2 。

我创建了两个可观察对象:

我最初的实现是:

initialState.subscribe(initialData=> {
    console.log(initialData);
    updateEvent.subscribe(updateEvent => {
        console.log(updateEvent);
    });
});

我面临的问题是在获取initalState并接收第一个更新(updateEvent)之后存在间隙。

(我可能会丢失在获取初始数据之后和订阅之前发生的更新)。

有没有一些实用的方法可以创建一个新的观察者同时订阅我的两个观察者并缓冲updateEvent观察者直到initalState完成,然后让它们以正确的顺序“首先是初始数据”然后是“更新”?

基本上使 initialState 只是“第一个”更新,但要确保在那之后没有任何丢失的更新。

标签: javascriptnode.jstypescriptrxjsreactive-programming

解决方案


看起来你可以通过使用buffer第二个 websocket 流来实现你所需要的,直到第一个发出。虽然,这条链变得有点复杂,因为您只想在第一个流发出后才开始接收值。

const initialStateShared = initialState.pipe(share());
const updateEventShared = updateEvent.pipe(share());

merge(
  initialStateShared,
  updateEventShared.pipe( // Buffer the second stream but only once
    buffer(initialStateShared),
    take(1),
  ),
  updateEventShared.pipe( // Updates from the second stream will be buffered first and then continue comming from here
    skipUntil(initialStateShared),
  )
).subscribe(...);

推荐阅读