首页 > 解决方案 > 在“长”间隔后完成可观察序列

问题描述

下面的可观察序列将每个元素添加到 ReplaySubject 中,以便我以后可以访问任何元素,甚至等待 ReplaySubject 的完成。它在达到时间跨度后完成 ReaplySubject。

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(1)))
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

我希望序列一直运行到自上次“OnNext”调用以来的给定时间,然后完成。这在各种蓝牙通信场景中非常有用,蓝牙设备会给我一个数据序列,然后在没有任何完成消息或事件的情况下停止。在这些场景中,我需要启发式地确定序列何时完成,然后自己完成。因此,如果自上次蓝牙通​​知以来已经“太长”了,我想完成 ReplaySubject。

我可以通过创建一个计时器,在收到每个元素时重置它,然后在计时器达到“太长”时完成 ReplaySubject 来做到这一点,但我听说创建一个对象并从可观察订阅中操作它不是线程安全。

关于如何在“太长”间隔后完成序列的任何建议?

这是一个我听说的不是线程安全的版本,但应该按预期工作:

bool reading = true;
System.Timers.Timer timer = new System.Timers.Timer(1000);
timer.Elapsed += (sender, e) =>
{
    reading = false;
};

ReplaySubject<string> rfidPlayer = new ReplaySubject<string>();
characteristic.WhenNotificationReceived()
    .TakeWhile(x => reading)
    .Subscribe(
        onNext: result =>
        {
            string nextTag = BitConverter.ToString(result.Data);
            nextTag = nextTag.Replace("-", "");

            timer.Stop();
            timer.Start();

            rfidPlayer.OnNext(nextTag);
        },
        onCompleted: () =>
        {
            rfidPlayer.OnCompleted();
        });

根据 Simonare 的第一个答案,这似乎令人满意:

                characteristic.WhenNotificationReceived()
                .Timeout(TimeSpan.FromSeconds(1))
                .Subscribe(
                    onNext: result =>
                    {
                        string nextTag = BitConverter.ToString(result.Data);
                        nextTag = nextTag.Replace("-", "");

                        rfidPlayer.OnNext(nextTag);
                    },
                    onError: error =>
                    {
                        rfidPlayer.OnCompleted();
                    });

标签: c#asynchronoussystem.reactive

解决方案


您可以考虑使用Timeout Operator。唯一的缺点是它以错误信号终止。您可能需要处理错误错误

如果 Observable 在指定的时间范围内未能发出任何项目,Timeout 运算符允许您使用 onError 终止来中止 Observable。

如果您使用下面的方法,您可以超越错误

 .Timeout(200, Promise.resolve(42));

如果触发了超时条件,另一种变体允许您指示超时切换到您指定的备份 Observable,而不是因错误而终止。


characteristic.WhenNotificationReceived()
    .Timeout(TimeSpan.FromSeconds(1))
    .Subscribe(
        onNext: result =>
        {
            ....
            rfidPlayer.OnNext(....);
        },
        onError: error =>
        {
            rfidPlayer.OnCompleted();
        });

推荐阅读