typescript - 带过滤器的 takeUntil
问题描述
我在 NestJS 中有一些带有 mergeMap 运算符的代码,我需要根据来自外部和内部 observables 的一些值来停止 observables 流。考虑这个例子:
type SomeEvent1 {
value: string;
}
type SomeEvent2 {
value: string;
}
@Injectable()
export class TradeSagas {
private readonly logger = new MyLogger(TradeSagas.name);
@Saga()
someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
return events$.pipe(
ofType(SomeEvent1),
mergeMap((event: SomeEvent1) => {
events$.pipe(
ofType(SomeEvent2),
map((event) => new SomeEvent3())
);
})
);
};
例如,当 SomeEvent1.value === SomeEvent2.value 时,我需要能够取消订阅侦听SomeEvent1
的流或侦听的流SomeEvent2
(尚不确定我的特定用例需要什么)。
我知道我可以使用 takeUntil 运算符,但它接受另一个 observable,所以我很困惑如何使它工作。
我想取消订阅的原因是因为 mergeMap 会跟踪所有以前的事件值,有时它会弄乱流程。我知道我可以使用 switchMap 但这不是我特定问题的解决方案,因为我不需要仅仅因为新事件到达而取消外部 observable - 我有一些更复杂的逻辑来构建事件流。如果对如何处理此问题有其他想法,请告诉我。
谢谢你的帮助。
解决方案
对于您的情况,takeWhile
这将是一个更好的选择,因为它需要一个函数而不是可观察的:
someSaga = (events$: Observable<IEvent>): Observable<ICommand> => {
return events$.pipe(
ofType(SomeEvent1),
mergeMap(event1 => events$.pipe(
ofType(SomeEvent2),
takeWhile(event2 => event2.value === event1.value),
map(event2 => new SomeEvent3())
);
})
);
};
编辑:
有时,如果将源分解为单独的变量,则更容易理解。所以我认为这可能是你正在寻找的:
const event1$ = events$.pipe(ofType(SomeEvent1));
const event2$ = events$.pipe(ofType(SomeEvent2));
const event3$ = events$.pipe(ofType(SomeEvent3));
const someSaga2 = events1$.pipe(
mergeMap(event1 => events2$.pipe(
takeUntil(events3$.pipe(
filter(event3 => event3.value === event1.value)
))
))
);
行为描述:当Event1
发射时,发射Event2
直到Event3
发射value
与Event1
.
推荐阅读
- reactjs - axios 的返回值不是从缓存中获取的
- junit - 如果存在于多个方法中,如何模拟局部变量
- python - 在数据框底部创建行,计算出现次数
- c# - 实例化 EF Core DbContexts
- prestashop-1.7 - 我正在使用 prestashop 1.7.5.2 一些客户想要从他们的商店中删除他们只想使用购物车功能的付款选项
- python - 为什么 scipy.optimize 参数上的误差线会超出范围
- vue.js - v-选择多个未设置值
- r - 未来的系统命令/Rshiny 中的承诺
- mongodb - 是否可以使用颤振飞镖连接到本地 MongoDB?
- android - 如何将徽章值添加到 android tabview?