rxjs - 如何根据密钥加入流
问题描述
这是为了,redux-observable
但我认为一般模式对于rxjs
我有一个事件流(来自redux-observable
,这些是 redux 操作),我特别希望为同一个“资源”配对两种不同类型的事件 - “资源集活动”和“资源加载” - 并发出一个新的这些事件“匹配”时的事件。问题是这些可以以任何顺序进入任何资源,并且可以多次触发。您可以在加载之前将某些内容设置为活动状态,或者在将其设置为活动状态之前加载某些内容,并且其他资源可能会在两者之间加载或设置为活动状态。
我想要的是“这个资源,现在被加载,现在是活动的”的流——这也意味着一旦一个资源被加载,它就可以被认为是永远加载的。
如果这些事件不是由资源 id 键入的,那么这将非常简单:
首先,我会按类型将它们分开:
const setActive$ = action$.filter(a => a.type == 'set_active');
const load = action$.filter(a => a.type == 'loaded');
在没有键控的简单情况下,我会说:
const readyevent$ = setActive$.withLatestFrom(loaded$)
thenreadyevent$
只是一个set_active
事件流,其中至少有一个loaded
事件。
但我的问题是set_active
和loaded
事件都由资源 ID 键入,并且由于我无法控制的原因,识别资源的属性在两个事件中是不同的。所以这变成了这样:
const setActive$ = action$.filter(a => a.type === 'set_active').groupBy(a => a.activeResourceId);
const loaded$ = action$.filter(a => a.type === 'loaded').groupBy(a => a.id);
但从这一点来看,我真的不知道如何在同一个键上“重新加入”这两个分组的可观察对象流,以便我可以发出一个withLatestFrom
动作流。
解决方案
我相信这可以满足您的描述:
const action$ = Rx.Observable.from([
{ activeResourceId: 1, type: 'set_active' },
{ id: 2, type: 'loaded' },
{ id: 1, type: 'random' },
{ id: 1, type: 'loaded' },
{ activeResourceId: 2, type: 'set_active' }
]).zip(
Rx.Observable.interval(500),
(x) => x
).do((x) => { console.log('action', x); }).share();
const setActiveAction$ = action$.filter(a => a.type === 'set_active')
.map(a => a.activeResourceId)
.distinct();
const allActive$ = setActiveAction$.scan((acc, curr) => [...acc, curr], []);
const loadedAction$ = action$.filter(a => a.type === 'loaded')
.map(a => a.id)
.distinct();
const allLoaded$ = loadedAction$.scan((acc, curr) => [...acc, curr], []);
Rx.Observable.merge(
setActiveAction$
.withLatestFrom(allLoaded$)
.filter(([activeId, loaded]) => loaded.includes(activeId)),
loadedAction$
.withLatestFrom(allActive$)
.filter(([loadedId, active]) => active.includes(loadedId))
).map(([id]) => id)
.distinct()
.subscribe((id) => { console.log(`${id} is loaded and active`); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>
基本方法是为每种操作类型创建一个不同的流,并将其与另一个不同的聚合连接。然后合并两个流。当有匹配的 setActive 和加载的事件时,这将发出该值。合并distinct()
结束时,您只会收到一次通知。如果您想在初始操作之后收到每个 setActive 操作的通知,则只需删除该操作员即可。
推荐阅读
- sql - 当临时表位置上仅存在基于修订 ID 的值站点程序集时,如何进行状态更新?
- php - 在 PHP 中没有任何库的数组到 XML
- javascript - 从 Context 更新状态时,React 组件不会重新渲染
- python - 使用python将带有unicode utf-8的csv加载到elasticsearch中
- intellij-idea - 调试时在 Intellij-Idea 中转储一个巨大的列表
- java - 如何迁移添加日期列的房间数据库?
- json - 将自定义输入类型添加到 graphql-tag 突变
- python - 是否有另一种方法可以为您的 django 应用程序添加实时更新
- asp.net-mvc - 无法将 GlyphIcon 附加到文本框
- ruby - Rubocop:当只发现警告时,是否可以告诉 Rubocop 返回零作为退出代码?