首页 > 解决方案 > 如何创建一个在订阅时激活的多播 observable?

问题描述

我想融合几个 Android 传感器的输入并将输出公开为支持多个同时观察者的可观察对象(或至少可以订阅的对象)。解决这个问题的惯用方法是什么?标准库中是否有一个类可以作为一个好的起点?

我正在考虑用一个或多个测试激活传感器的方法PublishSubject的委托将 a 包装在一个对象中,并将返回的包装在一个测试以停用它们的代理中。像这样的东西,虽然这已经有一些明显的问题:subscribehasObserversDisposablehasObservers

public class SensorSubject<T> {
    private final PublishSubject<T> mSubject = PublishSubject.create();

    public Disposable subscribe(final Consumer<? super T> consumer) {
        final Disposable d = mSubject.subscribe(consumer);
        if(mSubject.hasObservers()) {
            // activate sensors
        }
        return new Disposable() {
            @Override
            public void dispose() {
                // possible race conditions!
                if(!isDisposed()) {
                    d.dispose();
                    if(!mSubject.hasObservers()) {
                        // deactivate sensors
                    }
                }
            }

            @Override
            public boolean isDisposed() {
                return d.isDisposed();
            }
        };
    }
}

标签: rx-java2

解决方案


在 RxJava 中这样做的惯用方法是使用hot observable

当有人订阅冷可观察对象并将所有项目发送给该订阅者时,它们会执行一些操作。所以这是1对1的关系。

Hot observable执行一些操作并在单独订阅时独立发出项目。因此,如果您订阅得太晚,您可能无法获得一些较早发出的值。这是一对多的关系,也就是多播——这就是你想要的。

通常的方法是Flowable.publish()进行Flowable多播,但需要调用connect()方法来开始发射值。

在您的情况下,您还可以调用refCount()which 添加您想要的功能 - 当至少有一个订阅时它订阅源 Flowable 并在每个人都取消订阅时取消订阅。

因为publish().refCount()是相当流行的组合,所以他们有一个捷径 - share()。据我了解,这正是您想要的。

由提问者编辑:此代码以 Dagger 2 提供程序方法的形式结合了此答案和 David Karnok 的评论。SimpleMatrix来自EJML。这似乎正在做我要求的事情。

@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
    final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
    final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
    final SensorEventListener listener = new SensorEventAdapter() {
        @Override
        public void onSensorChanged(final SensorEvent event) {
            ps.onNext(new SimpleMatrix(1, 3, true, event.values));
        }
    };
    return ps.doOnSubscribe(s -> {
        sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
    }).doOnDispose(() -> {
        sensorManager.unregisterListener(listener);
    }).share();
}

推荐阅读