rx-java2 - 如何创建一个在订阅时激活的多播 observable?
问题描述
我想融合几个 Android 传感器的输入并将输出公开为支持多个同时观察者的可观察对象(或至少可以订阅的对象)。解决这个问题的惯用方法是什么?标准库中是否有一个类可以作为一个好的起点?
我正在考虑用一个或多个测试激活传感器的方法PublishSubject
的委托将 a 包装在一个对象中,并将返回的包装在一个测试以停用它们的代理中。像这样的东西,虽然这已经有一些明显的问题:subscribe
hasObservers
Disposable
hasObservers
public class SensorSubject<T> {
private final PublishSubject<T> mSubject = PublishSubject.create();
public Disposable subscribe(final Consumer<? super T> consumer) {
final Disposable d = mSubject.subscribe(consumer);
if(mSubject.hasObservers()) {
// activate sensors
}
return new Disposable() {
@Override
public void dispose() {
// possible race conditions!
if(!isDisposed()) {
d.dispose();
if(!mSubject.hasObservers()) {
// deactivate sensors
}
}
}
@Override
public boolean isDisposed() {
return d.isDisposed();
}
};
}
}
解决方案
在 RxJava 中这样做的惯用方法是使用hot observable。
当有人订阅冷可观察对象并将所有项目发送给该订阅者时,它们会执行一些操作。所以这是1对1的关系。
Hot observable执行一些操作并在单独订阅时独立发出项目。因此,如果您订阅得太晚,您可能无法获得一些较早发出的值。这是一对多的关系,也就是多播——这就是你想要的。
通常的方法是Flowable.publish()
进行Flowable
多播,但需要调用connect()
方法来开始发射值。
在您的情况下,您还可以调用refCount()
which 添加您想要的功能 - 当至少有一个订阅时它订阅源 Flowable 并在每个人都取消订阅时取消订阅。
因为publish().refCount()
是相当流行的组合,所以他们有一个捷径 - share()
。据我了解,这正是您想要的。
由提问者编辑:此代码以 Dagger 2 提供程序方法的形式结合了此答案和 David Karnok 的评论。SimpleMatrix
来自EJML。这似乎正在做我要求的事情。
@Provides
@Singleton
@Named(MAGNETOMETER)
public Observable<SimpleMatrix> magnetometer(final SensorManager sensorManager) {
final PublishSubject<SimpleMatrix> ps = PublishSubject.create();
final Sensor sensor = sensorManager.getDefaultSensor(TYPE_MAGNETIC_FIELD);
final SensorEventListener listener = new SensorEventAdapter() {
@Override
public void onSensorChanged(final SensorEvent event) {
ps.onNext(new SimpleMatrix(1, 3, true, event.values));
}
};
return ps.doOnSubscribe(s -> {
sensorManager.registerListener(listener, sensor, SENSOR_DELAY_NORMAL);
}).doOnDispose(() -> {
sensorManager.unregisterListener(listener);
}).share();
}
推荐阅读
- mysql - 数据建模和实现关系[1对多][多对多]等
- node.js - 我可以在不安装 Express 的情况下使用 body-parser 模块吗?
- c++ - 如何从 phinode 及其在 LLVM 中的相应基本块中获取标签?
- node.js - 号码的 Hapi/Joi 验证失败
- php - 未定义的偏移量 [PHP / JQUERY]
- typescript - 嵌套函数调用中的打字稿异步等待
- git - Git将不同的分支推送到不同的远程仓库?
- gcloud - Gcloud:从模板获取输出
- oracle - 无法在 oracle 12c Realease II 数据库中使用系统/管理器登录
- c++ - 如何在 MFC 中的 CArray 中使用 CArray?