首页 > 解决方案 > 如何强制一个可观察对象在空闲一段时间后始终发出

问题描述

我有一个应该扫描特定 ble 设备的应用程序。我使用的扫描功能不断扫描所有设备,甚至应该报告重复(同一设备可以被扫描多次)。为了组织扫描的数据,我创建了一个 observable,它会发出在列表中找到的所有设备,并删除在特定时间内未扫描的设备:

public findMyDevice() {
    let my_devices: any[] = [];

    return this.ble.startScan().pipe(
        filter(
            (device) => this.deviceIsMyDevice(device)
        ),
        map(
            (device) => {
                device = this.treatData(device);
                my_devices =  my_devices.filter((dev) => dev.id != device.id); // in case it is already in the list
                let now = new Date();
                device.time_scan = now;
                my_devices.push(device);
                my_devices.sort((a, b) => a.sn - b.sn);
                my_devices = this.removeOldDevices(my_devices); // remove scans older than 5 seconds
                return my_devices;
            }
        )
    )
}

如果附近至少有一台设备开启,它可以正常工作。

问题是:如果所有设备都关闭,我的扫描功能永远不会发出,并且永远不会调用 removeOldDevices...这样,一些旧设备会保留在列表中...

我试图解决它添加到我的管道:

timeoutWith(3000,
    of().pipe(
        map(
            () => {my_devices = this.removeOldDevices(my_devices); return my_devices}
        )
    )
)

但它似乎在超时后完成了订阅。解决这个问题的最佳方法是什么?如何在未完成订阅的情况下强制可观察对象在空闲时间后发出?是否有任何其他 rxjs 运营商可以帮助解决这种情况?

标签: typescriptionic5rxjs-observables

解决方案


听起来您想要的是间隔流。

interval(3000),创建一个每 3 秒发出一个数字的流。

const intervalUpdate$ = interval(3000).pipe(
    map(_ => my_devices = this.removeOldDevices(my_devices))
)

这将每 3 秒删除一次旧设备。旁注:请记住,如果您从不退订,这将永远存在。然后您可以(例如)将其与合并一起使用:

const scanUpdate$ = this.ble.startScan().pipe(
    filter(
    [... more code here]
);
return merge(intervalUpdate$, scanUpdate$);
    

如果您想变得更复杂,可以使用 switchMap 将间隔与流混合,以便每次发出设备时启动新的计时器:

// Make a fake object so we can trigger switchMap immediately and get our
// interval/timer stream engaged. There's probably a better way to do this.
const falseStart = {isFakeDevice: true};
// Compose the return stream
return this.ble.startScan().pipe(
    filter(device => this.deviceIsMyDevice(device)),
    // startWith() as a hack to make sure out interval stream gets 
    // switched into right away
    startWith(falseStart),
    // switchMap will create a new stream that emits the given value
    // (normally a device, but will be 'falseStart' to start) and then  
    // emits null every 3 seconds. That logic is restarted every time a  
    // new device arrives, effectively resetting the interval stream
    switchMap(device => 
        interval(3000).pipe(
            mapTo(null),
            startWith(device)
        )
    ),
    // Filtering out our hacked falseStart object; we don't want it
    filter(device => !device?.isFakeDevice),
    map(device => {
        // Your code here, remembering that device might be null if it
        // was triggered by the interval stream
        [...]
        return this.removeOldDevices(my_devices);
    })
);

更“复杂”的方法的缺点是传递的第一个值并不是真正可取的。我可能会将 falseStart 替换为 'null' 而不会费心将其过滤掉。当您订阅此流时,您会立即得到响应,但这通常不会对性能造成太大影响,甚至可能是可取的。

“复杂”方法的好处是,只要您仍在寻找设备,interval(3000) 可能永远不需要发出值。这可能会为您节省很多性能,具体取决于 this.removeOldDevices() 的成本。


推荐阅读