首页 > 解决方案 > 如何在 RxJava 中正确转换多播 observables

问题描述

假设我有一个事件发射数据源,我想将其转换为反应流。数据源由资源(例如定期发送更新状态的套接字)绑定,因此我想共享对该资源的单个订阅。使用单个 observable with replay(让新订阅者立即获得当前值)和refCount运营商似乎非常适合。例如,这是他的MyDataProvider单身人士的样子:

private final Observable<MyData> myDataObservable = Observable.<MyData>create(emitter -> {
    // Open my resource here and emit data into observable
})
    .doOnDispose(() -> {
        // Close my resource here
    })
    .replay(1)
    .refCount();

public Observable<MyData> getMyDataObservable() {
    return myDataObservable;
}

但是,现在假设我有另一个数据源需要第一个数据源的结果来计算自己的值:

private final Observable<AnotherData> anotherDataObservable = getMyDataProvider().getMyDataObservable()
    .flatMap(myData -> {
        // Call another data source and return the result here
    })

public Observable<AnotherData> getAnotherDataObservable() {
    return anotherDataObservable;
}

在这里,我的设置开始崩溃。第一个 observable 的多播只在refCount操作符之前有效。之后,一切都再次单播。这意味着如果进行了两次单独的订阅,anotherDataProviderflatMap操作员将被调用两次。我看到了两种解决方法,但我都不喜欢这两种方法:

1. 在多播发生之前转换第一个 observable

对我来说,最简单的解决方法似乎是myDataObservable在进行多播操作之前保存某处的单播变体,然后在其中执行该多播操作anotherDataObservable但是如果这两个可观察对象位于不同的模块中,这种解决方法会使代码非常不优雅,需要MyDataProvider公开两个看似返回相同数据的不同可观察对象。

2. 只使用重复的组播操作符

replay第二种解决方法似乎是refCountanotherDataObservable. 但这会导致效率低下,因为myDataObservable已经应用了第一个多播运算符,但现在什么都不做,除了浪费内存和 CPU 周期。

两种解决方法还涉及AnotherDataProviderMyDataProvider. 如果将来MyDataProvider不再需要更改和多播,我还必须更新AnotherDataProvider以从那里删除多播运营商。

解决这个问题的更优雅的方法是什么?我可以更好地构建它来完全避免这个问题吗?

标签: javaandroidrx-javareactive-programmingrx-java2

解决方案


关于您的第一种方法,在当前设置中,您的anotherDataObservable用途myDataObservable以及据我所知它们在逻辑上是耦合的,因为它们使用相同的来源。因此,您需要为它们提供一些基本的共享逻辑。我会将它提取到一个公共模块中,该模块将公开可观察的单播版本,然后在不同的模块中制作myDataObservableanotherDataObservable使用它,每个模块都添加多播逻辑。

另一种选择是有一个类,通过订阅它来监视您的资源,如在myDataObservable中进行处理并使用SubjectonNext发布映射结果,即BehavioralSubject如果您希望始终可以访问最后发布的值,以及原始结果与另一个主题。客户端将订阅该主题,并将获得在监控类中仅计算一次的映射或原始值。

PS记得在订阅之前给你的Subject添加背压策略。

如果这些选项不适合您,请考虑避免flatMap多次调用是否真的很重要?您的代码非常简单,它是一个重要的指标。如果 flatMap 不重,你可以让它运行多次。


推荐阅读