首页 > 解决方案 > 具有反应性扩展的阈值检测

问题描述

在花了一些时间在反应性扩展上后,我重写了这个问题——我可能知道现在很危险,所以请耐心等待,并提前致谢。

我有以下检测级别的代码:

static class LevelExtensions
{
    public static IEnumerable<double> GetCrossovers(this double[] self, double x1, double x2)
    {
        return from level in self
               where level >= x1 && level <= x2 || level <= x1 && level >= x2
               select level;
    }

    public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, double[] thresholds)
    {
        return source
                .Buffer(2, 1)
                .Where(x => x.Count == 2)
                .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
                .Where(x => x.LevelsCrossed.ToList().Count > 0)
                .SelectMany(x => from level in x.LevelsCrossed select new ThresholdCrossedEvent(level, x.Previous, x.Current));
    }
}

public class ThresholdCrossedEvent
{
    public ThresholdCrossedEvent(double level, double previous, double current)
    {

        Threshold = level;
        Previous = previous;
        Current = current;
    }

    public double Threshold { get; set; }
    public double Previous { get; set; }
    public double Current { get; set; }
    public Direction SlopeDirection => Current >= Previous ? Direction.Up : Direction.Down;
    public bool IsTouching => Current == Threshold || Previous == Threshold;
}


var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 }.ToObservable();
var levels = new double[] { 5, 7, 8 };

feed
  .ThresholdDetection(levels)
  .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));

它会按预期产生事件,到目前为止还可以:

{"Threshold":5.0,"Previous":3.0,"Current":5.0,"IsUpward":true,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"IsUpward":true,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"IsUpward":true,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"IsUpward":false,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"IsUpward":true,"IsTouching":true}

有什么表演的味道吗?

没有过早优化——还有什么会引起性能问题的(因为我每秒将处理 40,000 多条消息)?

我注意到我的GetLevelsBetweenPair水平交叉检测将依赖Linq.Where每秒数千次更新,并且我的levels序列应该从低到高排序 - 是否Linq为我优化或我应该考虑的任何事情?

我将如何处理levels要在运行时更新的列表?

我从消息总线调用OnNext我的feed列表发布值,然后通过用户界面/命令我添加/删除levels列表中的一个元素(并且它需要在修改后自动重新排序)。

鉴于不断检查这将是一个高度竞争的数组(但根本不会经常更新),这个级别列表更新过程应该考虑哪些事情?

哪种结构可以以最有效的方式允许门禁访问?

拥有不同的列表levels并在其上调用相同的订阅

我会有不同的“类别”级别,每个级别都创建相同的 ThresholdCrossedEvent,但意味着不同的东西,即 HistoricalThreshold 或 WeeklyThreshold。

可以(最好)将以下作为多个订阅执行,还是应该将此阈值类型数据编码到警报中并将其从 double[] 更改为包含必要字段的类对象?

feed
  .ThresholdDetection(historicalThresholds)
  .Subscribe(x => Console.WriteLine("Historical " + JsonConvert.SerializeObject(x)));

feed
  .ThresholdDetection(weeklyThresholds)
  .Subscribe(x => Console.WriteLine("Weekly: " + JsonConvert.SerializeObject(x)));

标签: reactive-programmingsystem.reactive

解决方案


你的实现看起来不错。几件事:

  1. 您还想将变化建模levels为可观察的。
  2. 我不是最好的 Linq2Objects 专家,但我认为列表/数组转换是不必要的,并且对性能有轻微影响(如果你担心的话)。显然是测试。
  3. 我讨厌 Linq 语法,所以我在下面的示例中删除了它。我不认为这很重要。

然后解决方案如下所示:

static class LevelExtensions
{
    public static IEnumerable<double> GetCrossovers(this IEnumerable<double> self, double x1, double x2)
    {
        return self
            .Where(level => level >= x1 && level <= x2 || level <= x1 && level >= x2);
    }

    public static IObservable<ThresholdCrossedEvent> ThresholdDetection(this IObservable<double> source, IObservable<double[]> thresholdSource)
    {
        return thresholdSource
            .Select(thresholds => source
                .Buffer(2, 1)
                .Where(x => x.Count == 2)
                .Select(x => new { LevelsCrossed = thresholds.GetCrossovers(x[0], x[1]), Previous = x[0], Current = x[1] })
                .Where(x => x.LevelsCrossed.Count() > 0)
                .SelectMany(x => x.LevelsCrossed.Select(lc => new ThresholdCrossedEvent(lc, x.Previous, x.Current)))
            )
            .Switch();
    }
}

这是对您的代码的直接改编:唯一的区别是 external Select、 theSwitch和类型。对于每个levels通知,newSelect创建一个新的 Observable。Switch意味着总是切换到最新的,并放弃旧的。所以这样你总是使用最新的levels阈值。

我通过运行以下代码对其进行了测试:

var feed = new double[] { 1, 2, 3, 5, 5.5, 6, 9, 12, 10, 9, 7.5, 6.5, 7 };
var levels = new double[] { 5, 7, 8 };

var feedSubject = new Subject<double>();
var levelsSubject = new BehaviorSubject<double[]>(levels);

feedSubject
    .ThresholdDetection(levelsSubject)
    .Subscribe(x => Console.WriteLine(JsonConvert.SerializeObject(x)));
    
foreach (var d in feed)
    feedSubject.OnNext(d);

levelsSubject.OnNext(new double[] { 5, 8 });
Console.WriteLine("---");

foreach (var d in feed)
    feedSubject.OnNext(d);

它使用您的原始阈值运行一次该提要数组,然后我在删除 7 阈值的情况下运行它。我的输出是这样的:

{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":7.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":7.5,"Current":6.5,"SlopeDirection":1,"IsTouching":false}
{"Threshold":7.0,"Previous":6.5,"Current":7.0,"SlopeDirection":0,"IsTouching":true}
---
{"Threshold":5.0,"Previous":3.0,"Current":5.0,"SlopeDirection":0,"IsTouching":true}
{"Threshold":5.0,"Previous":5.0,"Current":5.5,"SlopeDirection":0,"IsTouching":true}
{"Threshold":8.0,"Previous":6.0,"Current":9.0,"SlopeDirection":0,"IsTouching":false}
{"Threshold":8.0,"Previous":9.0,"Current":7.5,"SlopeDirection":1,"IsTouching":false}

推荐阅读