rx-java2 - 阻塞 RxAndroidBle 写操作
问题描述
如何使用 RxAndroidBle 在 Android 中执行阻塞写入操作。只有当写操作成功时,才应该执行下一个命令。
protected void doWriteBytes(UUID characteristic, byte[] bytes) {
final Disposable disposable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.subscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
},
throwable -> onError(throwable)
);
compositeDisposable.add(disposable);
}
protected void test() {
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});
// following command should be only performed if doWriteBytes is successful executed
foo();
// blocking write bytes
doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});
bar();
}
我知道 subscribe 和 onComplete 但也可以不用这些方法吗?
背景是我想在几个不同的子类中覆盖测试方法,所以我可以执行各种 doWriteBytes 命令(例如 ACK 命令)以向蓝牙设备发送一些字节,但我需要确保仅在以下情况下执行下一个命令ACK 命令发送成功。
也许这更像是一个 RxJava2 问题,但我不太熟悉它。
编辑:
感谢您的回答@Dariusz Seweryn。抱歉,我的问题可能不是很清楚。我会尝试具体化它。
我想像 test() 中的普通函数一样编写源代码来抽象 RxJava2 实现。唯一不同的是 doWriteBytes 和其他蓝牙操作(通知、读取)应该通过 RxAndroidBle 完成。我必须写入蓝牙设备的内容取决于 test() 方法中的通知字节或其他算法。此外,我想覆盖 test() 方法来为完全不同的蓝牙设备实现不同的蓝牙通信流程。按顺序处理蓝牙操作始终很重要。
现在我有三个想法:
1)我的第一个想法是实现所有 RxAndroidBle 操作阻塞,所以我可以使用简单的例如循环。
2)我的第二个想法是在运行时动态地将(concat?)添加到 test() 方法中的另一个观察结果,该方法是顺序处理的,但我总是需要先前观察结果的返回值?
3)我的第三个想法是将write/notify/write操作组合成一个方法,可以在test()方法中调用。该操作应该是向字符 A 写入字节,然后等待特性 B 上的通知,对接收到的字节进行一些处理,然后再次写入特性 C。但是在测试中运行时写入的内容或通知过程应该如何动态() 方法添加。
也许在 RxJava2 中我的问题有一个优雅的解决方案,或者根本不可能?
编辑2:
我试图实现所有三个想法,但不幸的是我没有成功。
1)
connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.blockingSubscribe(
value -> {
Timber.d("Write characteristic %s: %s",
BluetoothGattUuid.prettyPrint(characteristic),
byteInHex(value));
processBtQueue();
},
throwable -> onError(throwable)
);
即使成功也总是阻塞?我必须在某个地方发布它吗?此外,该方法返回 void 而不再是一次性的,但我无法处理它。
2)我在这个想法上挣扎。如果我不知道起始可观察对象,我应该连接到哪个可观察对象?connectionObserable 不起作用,因为它包含 RxBleConnection。第二个问题是蓝牙操作后的值是Java对象类!?我必须每次都投吗?你有一个例子我可以如何将蓝牙写入操作连接到通知蓝牙结果?
3)这个想法的问题是我不知道如何在运行时将处理部分动态添加到 RxJava 订阅部分之外的通知中?
我有一个想法 nr 3 的工作解决方案
protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
Observable observable = connectionObservable
.flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR)
.flatMap( writeBytes -> notificationObservable)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.retry(BT_RETRY_TIMES_ON_ERROR);
compositeDisposable.add(observable.subscribe());
return observable;
}
顺便提一句。我应该用这些问题在stackoverflow上创建单独的线程吗?
如果有帮助,你可以在这里找到我的实验源代码。
解决方案
我知道 subscribe 和 onComplete 但也可以不用这些方法吗?
鉴于:
Completable foo() { ... }
Completable bar() { ... )
可以这样做:
Disposable testDisposable = connectionObservable
.flatMapCompletable(connection ->
connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
.andThen(foo())
.andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
.andThen(bar())
)
.subscribe(
() -> { /* completed */ },
throwable -> { /* error happened */ }
)
用一个关闭上述.subscribe()
内容可以控制流程将完成多少个连接。在上面的示例中,如果连接在第一次写入期间提前结束——所有后续操作(foo()
、写入、bar()
)根本不会发生。
编辑:
你所有的想法都有可能奏效——你可以尝试一下。
也许在 RxJava2 中我的问题有一个优雅的解决方案,或者根本不可能?
//如果你真的需要它们,可以使用它们的函数.blocking*()
。不过要小心——由于向应用程序引入了更多状态,您可能会开始遇到一些难以调试的问题,具体取决于您的实现。Observable
Single
Completable
推荐阅读
- python - 只能用频谱图重建音频吗?
- sql-server - SQL Server 查询需要很长时间才能执行
- android - 如何保持一个 RecyclerView 项目上的开关不会影响另一个项目上的开关?
- javascript - 转换前 React Pose 组件更改
- python - 多个 NCS2 设备进行推理
- amazon-web-services - 访问根路径中的多个 S3 存储桶作为 CloudFront 分配中的源?
- sql - 如何列出正在计算的值?
- python - 来自节点的内容文本的 xpath
- flutter - 如何让两个 ListView.builder 在同一个滚动条上颤动?
- javascript - 如何在javascript中找到平均值?