首页 > 解决方案 > 需要状态的自包含响应式扩展辅助方法

问题描述

查看https://eprystupa.wordpress.com/2009/12/18/detecting-running-highlow-prices-using-reactive-extensions-for-net/它有一个有趣的代码块:

var rnd = new Random();
var feed = Observable.Defer(() =>
    Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
    .Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))
    .Repeat();

// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
    .Where(p => p < min)
    .Do(p => min = Math.Min(min, p))
    .Select(p => "New LO: " + p);

// Daily high price feed
double max = double.MinValue;
var feedHi = feed
    .Where(p => p > max)
    .Do(p => max = Math.Max(max, p))
    .Select(p => "New HI: " + p);

// Combine hi and lo in one feed and subscribe to it
feedLo.Merge(feedHi).Subscribe(Console.WriteLine);

上面是好的并且可以完成工作,但是局部变量maxmin意味着代码非常具体,而我想将NewLowHi代码/指标附加到现有IObservable<double>的很像https://github.com/fiatsasia/Financier有:

public static IObservable<TSource> SimpleMovingAverage<TSource>(this IObservable<TSource> source, int period)
{
    return source.Buffer(period, 1).Select(e => e.Average());
}

创建一个独立的 NewLowHi 指标的最佳实践是什么,我可以在不使用(或至少在内部隐藏)局部变量max和的情况下订阅它min

标签: c#system.reactive

解决方案


您在 WordPress 网站上引用的代码存在一些缺陷。

由于他们创建的方式,feed它是一个热门的可观察的,因为每个订阅都会收到一组不同的数字。所以 thefeedLofeedHiobservables 将使用不同的变量集。

但它变得更糟。例如,如果对 进行了两次订阅feedLo,那么将有两个订阅feed但只有一个状态变量,min这意味着输出的值将是两个订阅的最小值,而不是每个订阅的最小值。

我将展示如何正确执行此操作,但首先您的问题是关于如何封装状态。就是这样:

IObservable<T> feed =
    Observable
        .Defer(() =>
        {
            int state = 42;
            return Observable... // define your observable here.
        });

现在,feedRandom用于其状态。我们可以继续feed使用上面的模式重写。

var feed =
    Observable
        .Defer(() =>
        {
            var rnd = new Random();
            return
                Observable
                    .Generate(
                        0, x => true, x => x,
                        x => Math.Round(30.0 + rnd.NextDouble(), 2),
                        x => TimeSpan.FromSeconds(rnd.NextDouble()));
        });

我更喜欢使用Observable.GenerateDefer///模式ReturnDelayRepeat

现在了解如何获取最小值和最大值。

我想要一个IObservable<(State state, double value)>从单一订阅到源 observable 的高值和低值。如下State所示:

public enum State
{
    High,
    Low,
}

这是我的观察:

IObservable<(State state, double value)> feedHighLow(IObservable<double> source) =>
    source.Publish(xs => Observable.Merge(
        xs.Scan(Math.Min).DistinctUntilChanged().Select(x => (state: State.Low, value: x)),
        xs.Scan(Math.Max).DistinctUntilChanged().Select(x => (state: State.High, value: x))));

现在我可以调用feedHighLow(feed)并从单个订阅源获取High/值流。LowPublish调用确保了对源的一次订阅,这Merge意味着我可以运行两个不同的 observable 来分别获取最小值和最大值。

我得到这样的结果:

输出


推荐阅读