首页 > 解决方案 > 如何在 RxJS 中选择性地发出热(共享)的 observables?

问题描述

在我的代码中,我有一个热可观察对象(通过 share() 运算符传输的冷可观察对象)发出带有标识符的值(即有效负载类型包含一个“id”字符串属性)。一群观察者然后根据所述 id 过滤发出的值。

我现在认为我的代码正在对性能造成巨大影响,因为每个观察者都必须过滤每个发出的值。或者更确切地说,每次我需要一个新的观察者来订阅共享的 observable 时,都会创建一个新的过滤后的 observable。代码看起来像这样:

const source = new Observable();
const sharedObservable = source.pipe(share());
sharedObservable
    .pipe(filter(payload => payload.id === "id1"))
    .subscribe(observerOne);
sharedObservable
    .pipe(filter(payload => payload.id === "id2"))
    .subscribe(observerTwo);
// etc.

我最初的解决方案是创建我自己的热/共享 observable 类型,它根据从 ids 到应该接收有效负载的观察者列表的内部映射来处理过滤。这将减少必须为每个订阅的观察者运行一些谓词过滤器功能的成本。在 observable 的下面看起来像(或模糊地像)这样:

class FilteredHotObservable {
    constructor(sourceObservable) {
        this.observerMap = new Map();
        this.source = sourceObservable;
        this.source.subscribe(payload => {
            const observers = this.observerMap.get(payload.id);
            observers.forEach(observer => observer.next(payload));
        });
    }

    subscribe(observer, desiredIdToFilterBy) {
        const observers = this.observerMap.get(desiredIdToFilterBy);
        observers.push(observer);
    }
}

不过,在我开始实施之前,我想知道是否已经有处理这种情况的正确方法。换句话说,RxJS 是否已经有一种机制(例如,一个特殊的 Subject,一个花哨的操作符)来促进这种可扩展的“过滤和共享”可观察性?

免责声明:上面代码片段的名称和语法与我实际所做的不准确。为了这篇文章,它们只是示例。

编辑:以下是有关使用filter运算符的排放处理延迟的实际数字。对于“PostFilterLatency”,发出的有效负载由对应于共享 observable 上的单个观察者的 id 过滤。对于“PreFilterLatency”,通过 id->Observer map 手动过滤排放,然后使用适当的有效负载调用observer.next()。这些数字是从发射有效载荷到相应的观察者处理它们所花费的时间(时间以毫秒为单位)。

ObserverCount  PostFilterLatency  PreFilterLatency
-------------  -----------------  ----------------
1              0                  0
10             0                  0
100            0.03               0
1000           0.039              0.001
10000          1.1849             0.0005
100000         19.90902           0.002

编辑:这是我用来运行这些测试的代码的链接。

标签: javascripttypescriptrxjsobservablereactivex

解决方案


推荐阅读