首页 > 解决方案 > PublishSubject 为所有观察者调用昂贵的函数

问题描述

我有一个特定的场景,我实现了一个 PublishSubject 来根据自定义事件发出项目。对于将要发出的每个项目,我还需要保留该值(一项昂贵的操作)。我想要实现的是拥有一个函数(如map),它将为所有观察者调用一次,然后每个观察者通过该onNext()方法接收的项目。

题目:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

触发器(发射项目):

commonSubject.onNext(new SomeResult())

暴露主题(将由控制器使用):

public static Observable<SomeResult> observeResults() {
    return commonSubject.share();
}

控制器:

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single());
}   

订户:

CustomControllerResult.observeResults().subscribe(result -> doSomething());
CustomControllerResult.observeResults().subscribe(result -> doSomethingElse());

每个观察者都按预期接收项目,但是如果我将昂贵的操作添加到控制器,则会为每个观察者调用此操作(这是我不想要的):

public Observable<SomeResult> observeResults() {
    return CustomConsumer.observeResults()
            .observeOn(Schedulers.single())
            .compose(persistResult())
            .compose(logResult())
            .share();
}

关于如何达到预期结果的任何想法?

标签: androidrx-java2publishsubject

解决方案


问题是每次observeResults()调用它都会用share操作符创建一个新的 Observable。但是创建的 Observable 并没有与订阅者共享。

您可以将代码更改为:

Observable<SomeResult> observable = CustomControllerResult.observeResults()
observable.subscribe(result -> doSomething());
observable.subscribe(result -> doSomethingElse());

或者您可以更改observeResults方法以返回共享的 Observable:

static final PublishSubject<SomeResult> commonSubject = PublishSubject.create()

static final Observable<SomeResult> observable = commonSubject
    .observeOn(Schedulers.single())
    .compose(persistResult())
    .compose(logResult())
    .share();

public static Observable<SomeResult> observeResults() {
    return observable;
}

推荐阅读