首页 > 解决方案 > RXJS 扫描 - withLatestFrom 另一个 observable

问题描述

我正在尝试研究如何在我的输入 observable 发出新值时使用 scan 导出新状态,但我似乎无法让它工作。

我想每次 input$ observable 发出一个新值时输出一个新状态,但它应该从 state$ 的当前值派生。

谁能建议我如何解决这个问题?我有一种感觉我完全有错误的想法:-)

我的代码看起来像这样:

const stateReducer = (state$: Observable<State>, input$: Observable<Input>) => {
  state$ = state$.pipe( startWith(DEFAULT_STATE) );

  const foo$: Observable<State> = input$.pipe(
    filter((input) => isFoo(input)),
    withLatestFrom(state$),
    scan((acc, ([input, state]) => {
        //returns derived state
    });

   const bar$: Observable<State> = input$.pipe(
    filter((input) => isBar(input)),
    withLatestFrom(state$),
    scan((acc, ([input, state]) => {
        //returns derived state
    });

   return merge(
     foo$,
     bar$
   );
}

标签: javascripttypescriptrxjs

解决方案


由于您想使用 Observable 的结果作为您的种子值,switchMap这将有所帮助。

switchMap 文档

   const bar$: Observable<State> = state$.pipe(
    switchMap((state) => {
      return input$.pipe(
        filter((input) => isBar(input)),  
        scan((curState, input) => {
          // do some logic here
          return {...curState, prop: 'new value'}
        }, state);
      )
    })

我在https://codepen.io/askmattcairns/pen/LYjEoZz?editors=0010制作了一个示例 CodePen 以获得更完整的解决方案。


更多细节

switchMap意思是切换到这里的流。所以这段代码是说,一旦state$发出一个值,将它存储(as state),然后等待input$发出。

当我们调用 时scan,它的种子值( 的第二个属性scan)现在是state$的发出值的结果。

这将在任何时候input$收到一个新值时发出一个新值。

更新代码沙箱

在深入了解您的代码沙箱后,我更好地了解了问题所在。你可以在https://codesandbox.io/s/elegant-chaum-2b794?file=/src/index.tsx看到我的最终输出。

当您初始化 2 个内部流foo$bar$,它们都state$使用withLatestFrom. 每次input$发出时,它仍然引用 的原始值state,使用 0 作为其起始总数。


推荐阅读