首页 > 解决方案 > 阻塞 RxAndroidBle 写操作

问题描述

如何使用 RxAndroidBle 在 Android 中执行阻塞写入操作。只有当写操作成功时,才应该执行下一个命令。

protected void doWriteBytes(UUID characteristic, byte[] bytes) {
    final Disposable disposable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
            .subscribe(
                    value -> {
                        Timber.d("Write characteristic %s: %s",
                                BluetoothGattUuid.prettyPrint(characteristic),
                                byteInHex(value));
                    },
                    throwable -> onError(throwable)
            );

    compositeDisposable.add(disposable);
}


protected void test() {
  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12});

  // following command should be only performed if doWriteBytes is successful executed
  foo();

  // blocking write bytes
  doWriteBytes(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1});

  bar();
}

我知道 subscribe 和 onComplete 但也可以不用这些方法吗?

背景是我想在几个不同的子类中覆盖测试方法,所以我可以执行各种 doWriteBytes 命令(例如 ACK 命令)以向蓝牙设备发送一些字节,但我需要确保仅在以下情况下执行下一个命令ACK 命令发送成功。

也许这更像是一个 RxJava2 问题,但我不太熟悉它。

编辑:

感谢您的回答@Dariusz Seweryn。抱歉,我的问题可能不是很清楚。我会尝试具体化它。

我想像 test() 中的普通函数一样编写源代码来抽象 RxJava2 实现。唯一不同的是 doWriteBytes 和其他蓝牙操作(通知、读取)应该通过 RxAndroidBle 完成。我必须写入蓝牙设备的内容取决于 test() 方法中的通知字节或其他算法。此外,我想覆盖 test() 方法来为完全不同的蓝牙设备实现不同的蓝牙通信流程。按顺序处理蓝牙操作始终很重要。

现在我有三个想法:

1)我的第一个想法是实现所有 RxAndroidBle 操作阻塞,所以我可以使用简单的例如循环。

2)我的第二个想法是在运行时动态地将(concat?)添加到 test() 方法中的另一个观察结果,该方法是顺序处理的,但我总是需要先前观察结果的返回值?

3)我的第三个想法是将write/notify/write操作组合成一个方法,可以在test()方法中调用。该操作应该是向字符 A 写入字节,然后等待特性 B 上的通知,对接收到的字节进行一些处理,然后再次写入特性 C。但是在测试中运行时写入的内容或通知过程应该如何动态() 方法添加。

也许在 RxJava2 中我的问题有一个优雅的解决方案,或者根本不可能?

编辑2:

我试图实现所有三个想法,但不幸的是我没有成功。

1)

connectionObservable
                .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .retry(BT_RETRY_TIMES_ON_ERROR)
                .blockingSubscribe(
                        value -> {
                            Timber.d("Write characteristic %s: %s",
                                    BluetoothGattUuid.prettyPrint(characteristic),
                                    byteInHex(value));
                            processBtQueue();
                        },
                        throwable -> onError(throwable)
                );

即使成功也总是阻塞?我必须在某个地方发布它吗?此外,该方法返回 void 而不再是一次性的,但我无法处理它。

2)我在这个想法上挣扎。如果我不知道起始可观察对象,我应该连接到哪个可观察对象?connectionObserable 不起作用,因为它包含 RxBleConnection。第二个问题是蓝牙操作后的值是Java对象类!?我必须每次都投吗?你有一个例子我可以如何将蓝牙写入操作连接到通知蓝牙结果?

3)这个想法的问题是我不知道如何在运行时将处理部分动态添加到 RxJava 订阅部分之外的通知中?

我有一个想法 nr 3 的工作解决方案

protected Observable<byte[]> doWriteNotify(UUID characteristic, byte[] bytes, Observable<byte[]> notificationObservable) {
    Observable observable = connectionObservable
            .flatMapSingle(rxBleConnection -> rxBleConnection.writeCharacteristic(characteristic, bytes))
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR)
        .flatMap( writeBytes -> notificationObservable)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .retry(BT_RETRY_TIMES_ON_ERROR);

    compositeDisposable.add(observable.subscribe());

    return observable;
}

顺便提一句。我应该用这些问题在stackoverflow上创建单独的线程吗?

如果有帮助,你可以在这里找到我的实验源代码。

标签: rx-java2rxandroidble

解决方案


我知道 subscribe 和 onComplete 但也可以不用这些方法吗?

鉴于:

Completable foo() { ... }

Completable bar() { ... )

可以这样做:

Disposable testDisposable = connectionObservable
                                .flatMapCompletable(connection ->
                                    connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x35, 0x12}).ignoreElement()
                                        .andThen(foo())
                                        .andThen(connection.writeCharacteristic(UUID.fromString("f433bd80-75b8-11e2-97d9-0002a5d5c51b"), new byte[] {0x5, 0x6, 0x1}).ignoreElement())
                                        .andThen(bar())
                                )
                                .subscribe(
                                     () -> { /* completed */ },
                                     throwable -> { /* error happened */ }
                                )

用一个关闭上述.subscribe()内容可以控制流程将完成多少个连接。在上面的示例中,如果连接在第一次写入期间提前结束——所有后续操作(foo()、写入、bar())根本不会发生。

编辑:

你所有的想法都有可能奏效——你可以尝试一下。

也许在 RxJava2 中我的问题有一个优雅的解决方案,或者根本不可能?

//如果你真的需要它们,可以使用它们的函数.blocking*()。不过要小心——由于向应用程序引入了更多状态,您可能会开始遇到一些难以调试的问题,具体取决于您的实现。ObservableSingleCompletable


推荐阅读