首页 > 解决方案 > 如何根据密钥加入流

问题描述

这是为了,redux-observable但我认为一般模式对于rxjs

我有一个事件流(来自redux-observable,这些是 redux 操作),我特别希望为同一个“资源”配对两种不同类型的事件 - “资源集活动”和“资源加载” - 并发出一个新的这些事件“匹配”时的事件。问题是这些可以以任何顺序进入任何资源,并且可以多次触发。您可以在加载之前将某些内容设置为活动状态,或者在将其设置为活动状态之前加载某些内容,并且其他资源可能会在两者之间加载或设置为活动状态。

我想要的是“这个资源,现在被加载,现在是活动的”的流——这也意味着一旦一个资源被加载,它就可以被认为是永远加载的。

如果这些事件不是由资源 id 键入的,那么这将非常简单:

首先,我会按类型将它们分开:

const setActive$ = action$.filter(a => a.type == 'set_active');
const load = action$.filter(a => a.type == 'loaded');

在没有键控的简单情况下,我会说:

const readyevent$ = setActive$.withLatestFrom(loaded$)

thenreadyevent$只是一个set_active事件流,其中至少有一个loaded事件。

但我的问题是set_activeloaded事件都由资源 ID 键入,并且由于我无法控制的原因,识别资源的属性在两个事件中是不同的。所以这变成了这样:

const setActive$ = action$.filter(a => a.type === 'set_active').groupBy(a => a.activeResourceId);
const loaded$ = action$.filter(a => a.type === 'loaded').groupBy(a => a.id);

但从这一点来看,我真的不知道如何在同一个键上“重新加入”这两个分组的可观察对象流,以便我可以发出一个withLatestFrom动作流。

标签: rxjsredux-observable

解决方案


我相信这可以满足您的描述:

const action$ = Rx.Observable.from([
  { activeResourceId: 1, type: 'set_active' },
  { id: 2, type: 'loaded' },
  { id: 1, type: 'random' },
  { id: 1, type: 'loaded' },
  { activeResourceId: 2, type: 'set_active' }
]).zip(
  Rx.Observable.interval(500),
  (x) => x
).do((x) => { console.log('action', x); }).share();

const setActiveAction$ = action$.filter(a => a.type === 'set_active')
  .map(a => a.activeResourceId)
  .distinct();
const allActive$ = setActiveAction$.scan((acc, curr) => [...acc, curr], []);
  
const loadedAction$ = action$.filter(a => a.type === 'loaded')
  .map(a => a.id)
  .distinct();
const allLoaded$ = loadedAction$.scan((acc, curr) => [...acc, curr], []);
  
Rx.Observable.merge(
  setActiveAction$
    .withLatestFrom(allLoaded$)
    .filter(([activeId, loaded]) => loaded.includes(activeId)),
  loadedAction$
    .withLatestFrom(allActive$)
    .filter(([loadedId, active]) => active.includes(loadedId))
).map(([id]) => id)
 .distinct()
 .subscribe((id) => { console.log(`${id} is loaded and active`); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>

基本方法是为每种操作类型创建一个不同的流,并将其与另一个不同的聚合连接。然后合并两个流。当有匹配的 setActive 和加载的事件时,这将发出该值。合并distinct()结束时,您只会收到一次通知。如果您想在初始操作之后收到每个 setActive 操作的通知,则只需删除该操作员即可。


推荐阅读