首页 > 解决方案 > 相互依赖的可观察对象中的错误处理

问题描述

根据反引号的建议进行更新:

    this.ble.connect(macAddr)
    .pipe(
      tap(outer => console.log(`outer observable`)),
      switchMap(() =>
      this.ble.startNotification(macAddr, 
        ENV.CUSTOM_SERVICE,
        ENV.VALUE_CHARACTERISTIC)
          .pipe(
            tap(inner => console.log(`inner observable`)),
            timeout(3000) // <- no further messages
          )
      ),
      // timeout(3000), // <- errors after 18 seconds
      retry(5)
    )
    .subscribe(
      (data) => console.log(`incoming buffer: ${new Uint8Array(data).join(':')}`),
      (error) => console.log(`outer observable ${error}`)
    );

随着内部 observable 的超时,消息将停止,没有进一步的日志。

10:03:45.269 outer observable
10:03:46.058 inner observable
10:03:46.059 incoming buffer: 8:0:138:255:0:0:0:0

随着外部管道的超时,它会在 18 秒后到达主订阅的错误块。这将对应于 3 秒间隔加 1 次重试 5 次。这表明它正在重试内部可观察但没有记录该管道中的水龙头。

09:58:08.426 inner observable
09:58:08.426 incoming buffer: 48:0:138:255:0:0:0:0
09:58:26.516 outer observable handling final TimeoutError: Timeout has occurred

期望的行为是它在任何一个可观察到的错误上重试连接并重新订阅通知特征。

注意:存在异步,因为我必须在初始连接后和通知订阅之前使用承诺在设备上设置模式。为简单起见省略。

await this.ble.write(macAddress, ENV.CUSTOM_SERVICE,
                        ENV.MODE_CHARACTERISTIC, mode);

我一直在换入和换出 retryWhen/switchMap/mergeMap/concatMaps 的变体,这与我可以得到的工作解决方案一样接近。

this.ble.connect(macAddress)
    .pipe(
      retry(5),
      switchMap(async (value, index) => {
        console.log(`in higher order mapping ${index}`);

        return this.ble.startNotification(macAddress, 
                                          ENV.CUSTOM_SERVICE,
                                          ENV.VALUE_CHARACTERISTIC);
          .pipe(
            timeout(BLE_NOTIFICATION_TIMEOUT),
          ).subscribe(
            result => 
              console.log(`incoming buffer: ${new Uint8Array(result).join(':')}`),
            error => {
              console.log(`listening for notifications`, error);
              return throwError(error);
            }
          );
      })
    )
    .subscribe(
      data => console.log(`'next' block of outer observable`, data)
    , error => console.log(`outer observable handling final ${error}`)
)

当应用程序连接到 BLE 设备时,它会订阅具有 Notify 属性的特征。连接或通知 observables 都可能发生错误。在第一种情况下,它足够干净,重新建立连接并订阅通知。在后一种情况下,除非连接中断,否则错误不会出现在外部 observable 上,并且不会发生重试。我不确定我应该以什么方式组合这两个 observables,但如果其中任何一个有错误,我想重试连接,并重新启动通知。

标签: angularerror-handlingrxjsobservablebluetooth-lowenergy

解决方案


有几个问题对您不利:

首先subscribe,对(within )的内部调用switchMap是非惯用的,并且会表现得很奇怪。

作为一个概念,操作符的存在是为了抽象出来,subscribe作为一种转换/控制可观察对象的机制。因此,在运算符的回调中直接订阅 observable与运算符的概念相反,运算符是接受可观察对象并返回新对象的函数。移除对 subscribe 的调用是您在这里需要做的所有事情 - 当您在整个构造上调用 subscribe 时,将自动处理内部 observables 的订阅。

其次,操作符的放置retry意味着重试将仅基于从可观察的连接抛出的错误启动。switchMap如果您还想捕获通知错误,则需要将其放在后面。

第三,您的switchMap回调被声明为async,即使没有await关键字,也将回调的返回值包装在 Promise 中。如果回调的返回值已经是可观察的(或类似可观察的) ,这可能不是您想要做的。

最后(虽然这可能是在您粘贴示例时无意中引入的),回调中的.pipe()调用前面有一个分号switchMap,这会导致语法错误。

这可能是您想要的结构。

this.ble
  .connect(macAddress)
  .pipe(
    switchMap(() =>
      this.ble
        .startNotification(
          macAddress,
          ENV.CUSTOM_SERVICE,
          ENV.VALUE_CHARACTERISTIC
        )
        .pipe(timeout(BLE_NOTIFICATION_TIMEOUT))
    ),
    retry(5) // Capture failures of connection *or* notification timeout
  )
  .subscribe(
    (data) => console.log(`'next' block of outer observable`, data),
    (error) => console.log(`outer observable handling final ${error}`)
  );

推荐阅读