首页 > 解决方案 > RxJs - 结合两个 observables,结果选择器函数,zip

问题描述

我正在处理一些遗留代码,并且设备状态更新错误存在问题。问题是,旧的 -遗留状态更新是使用长轮询完成的,并且它在 BE 上存在无法修复的并发问题(BE 已弃用)。有时它会忘记更新状态。消息到达但不正确。

我可以从新的 BE 中检索到请求它的正确状态(该应用程序现在是一个令人讨厌的混合体),但我无法正确地做到这一点。我的第一个想法是使用whitLatestFrom,但由于明显的原因它不起作用。

legacyService.getState<LegacyState>()
  .pipe(
    subscribeOn(queue),
    withLatestFrom(this.realService.getRealStatus()), // request to the new BE
    map(([legacy, real]) => {
      // This if statement is a must. getRealStatus() returns only partial state, not the whole DeviceState
      // If there is a difference in the state from two sources, I create a new object with correct state
      if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
        return new LegacyState(
          legacy.instrument, 
          legacy.db,
          DeviceState.UNSTABLE, 
        );
      }
      return event; // states are same, returning state from legacy source
    })
  )
  .subscribe(event => {
    this._deviceStateUpdated$.next(event);
  });

适用于应用程序重新启动/重新加载,但稍后real状态不会更新,因为没有进行新调用,它只是返回以前的值。也一样combineLatest。第一个(来自轮询)被更新,第二个只是返回以前的值。

问题是:我怎样才能以这种方式组合两个可观察对象,当第一个被更新时,我也强制更新第二个可观察对象的新值?当然,我可以同时处理它们,因为第二个 observable 只返回一个部分状态。我尝试了多个地图 ( swtichMap, concatMap, ...) 但没有成功。

标签: angulartypescriptrxjscombinelatest

解决方案


我所说的投影函数更准确地称为结果选择函数。请参阅本页上的示例 3 。我将使用的基本结构是这样的:

import { interval } from 'rxjs';
import { switchMap } from 'rxjs/operators';

//whatever your long polling interval is
const polling = interval(10000);

//then use that to collect your states 
const collectState = polling
    //pipe the interval event
   .pipe(
      //first you need the real state so you switchMap into that
      //I'm assuming this is Rx can treat as an observable, like a promise
      switchMap(this.realService.getRealStatus()),
      //then you have your real status and you need to use a result selector function
      switchMap(() => 
         legacyService.getState(),
         //at this point in the result selector function you have access to both states
         (real, legacy) => {
             //so you can apply your logic
             if (legacy.state === DeviceState.INITIALIZED && !real.stable) {
                 return new LegacyState(
                    legacy.instrument, 
                    legacy.db,
                    DeviceState.UNSTABLE
                );
             } else { return legacy }
      })
    );
  

我意识到我已经无济于事地改变了真实/传统的顺序,但你明白了要点。

另一种方法是创建一个区间 observable 和一个 zipped observable,它们仅在真实状态和遗留状态都发出时才发出

import { zip, interval } from 'rxjs';
import { switchMap, map } from 'rxjs/operators';

const polling = interval(10000);
const states = zip(
    legacyService.getState(),
    this.realService.getRealStatus()
);
const collectState = polling
    .pipe(
        switchMap(states),
        map(statesArray => {
           //do the work here
        })
    );

);

希望这里有帮助


推荐阅读