首页 > 解决方案 > 带过滤器的 takeUntil

问题描述

我在 NestJS 中有一些带有 mergeMap 运算符的代码,我需要根据来自外部和内部 observables 的一些值来停止 observables 流。考虑这个例子:

type SomeEvent1 {
  value: string;
}

type SomeEvent2 {
  value: string;
}

@Injectable()
export class TradeSagas {
  private readonly logger = new MyLogger(TradeSagas.name);
  @Saga()
  someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
    return events$.pipe(
      ofType(SomeEvent1),
      mergeMap((event: SomeEvent1) => {
        events$.pipe(
          ofType(SomeEvent2),
          map((event) => new SomeEvent3())
        );
      })
    );
  };

例如,当 SomeEvent1.value === SomeEvent2.value 时,我需要能够取消订阅侦听SomeEvent1的流或侦听的流SomeEvent2(尚不确定我的特定用例需要什么)。

我知道我可以使用 takeUntil 运算符,但它接受另一个 observable,所以我很困惑如何使它工作。

我想取消订阅的原因是因为 mergeMap 会跟踪所有以前的事件值,有时它会弄乱流程。我知道我可以使用 switchMap 但这不是我特定问题的解决方案,因为我不需要仅仅因为新事件到达而取消外部 observable - 我有一些更复杂的逻辑来构建事件流。如果对如何处理此问题有其他想法,请告诉我。

谢谢你的帮助。

标签: typescriptrxjsnestjs

解决方案


对于您的情况,takeWhile这将是一个更好的选择,因为它需要一个函数而不是可观察的:

  someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
    return events$.pipe(
      ofType(SomeEvent1),
      mergeMap(event1 => events$.pipe(
          ofType(SomeEvent2),
          takeWhile(event2 => event2.value === event1.value),
          map(event2 => new SomeEvent3())
        );
      })
    );
  };

编辑:

有时,如果将源分解为单独的变量,则更容易理解。所以我认为这可能是你正在寻找的:

const event1$ = events$.pipe(ofType(SomeEvent1));
const event2$ = events$.pipe(ofType(SomeEvent2));
const event3$ = events$.pipe(ofType(SomeEvent3));

const someSaga2 = events1$.pipe(
  mergeMap(event1 => events2$.pipe(
    takeUntil(events3$.pipe(
        filter(event3 => event3.value === event1.value)
    ))
  ))
);

行为描述:当Event1发射时,发射Event2直到Event3发射valueEvent1.


推荐阅读