c# - 需要状态的自包含响应式扩展辅助方法
问题描述
查看https://eprystupa.wordpress.com/2009/12/18/detecting-running-highlow-prices-using-reactive-extensions-for-net/它有一个有趣的代码块:
var rnd = new Random();
var feed = Observable.Defer(() =>
Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
.Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))
.Repeat();
// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
.Where(p => p < min)
.Do(p => min = Math.Min(min, p))
.Select(p => "New LO: " + p);
// Daily high price feed
double max = double.MinValue;
var feedHi = feed
.Where(p => p > max)
.Do(p => max = Math.Max(max, p))
.Select(p => "New HI: " + p);
// Combine hi and lo in one feed and subscribe to it
feedLo.Merge(feedHi).Subscribe(Console.WriteLine);
上面是好的并且可以完成工作,但是局部变量max
和min
意味着代码非常具体,而我想将NewLowHi代码/指标附加到现有IObservable<double>
的很像https://github.com/fiatsasia/Financier有:
public static IObservable<TSource> SimpleMovingAverage<TSource>(this IObservable<TSource> source, int period)
{
return source.Buffer(period, 1).Select(e => e.Average());
}
创建一个独立的 NewLowHi 指标的最佳实践是什么,我可以在不使用(或至少在内部隐藏)局部变量max
和的情况下订阅它min
?
解决方案
您在 WordPress 网站上引用的代码存在一些缺陷。
由于他们创建的方式,feed
它是一个热门的可观察的,因为每个订阅都会收到一组不同的数字。所以 thefeedLo
和feedHi
observables 将使用不同的变量集。
但它变得更糟。例如,如果对 进行了两次订阅feedLo
,那么将有两个订阅feed
但只有一个状态变量,min
这意味着输出的值将是两个订阅的最小值,而不是每个订阅的最小值。
我将展示如何正确执行此操作,但首先您的问题是关于如何封装状态。就是这样:
IObservable<T> feed =
Observable
.Defer(() =>
{
int state = 42;
return Observable... // define your observable here.
});
现在,feed
源Random
用于其状态。我们可以继续feed
使用上面的模式重写。
var feed =
Observable
.Defer(() =>
{
var rnd = new Random();
return
Observable
.Generate(
0, x => true, x => x,
x => Math.Round(30.0 + rnd.NextDouble(), 2),
x => TimeSpan.FromSeconds(rnd.NextDouble()));
});
我更喜欢使用Observable.Generate
比Defer
///模式Return
。Delay
Repeat
现在了解如何获取最小值和最大值。
我想要一个IObservable<(State state, double value)>
从单一订阅到源 observable 的高值和低值。如下State
所示:
public enum State
{
High,
Low,
}
这是我的观察:
IObservable<(State state, double value)> feedHighLow(IObservable<double> source) =>
source.Publish(xs => Observable.Merge(
xs.Scan(Math.Min).DistinctUntilChanged().Select(x => (state: State.Low, value: x)),
xs.Scan(Math.Max).DistinctUntilChanged().Select(x => (state: State.High, value: x))));
现在我可以调用feedHighLow(feed)
并从单个订阅源获取High
/值流。Low
该Publish
调用确保了对源的一次订阅,这Merge
意味着我可以运行两个不同的 observable 来分别获取最小值和最大值。
我得到这样的结果:
推荐阅读
- laravel - 没有主键的 Laravel NOVA 会出错
- asp.net-mvc - 有没有办法在 ASP.NET MVC 中集成 Material-components-web
- keras - keras中的矩阵时间向量
- amazon-web-services - 我想定期将文件从本地服务器推送到 AWS S3
- css - 使用 Node.js (Ubuntu 18.04) 将 CSS 和图像链接到我在远程 Express 服务器上的 EJS 模板
- ckeditor - CK编辑器。光标在起始位置且没有选择时如何区分window.getSelection()
- javascript - 在这种情况下,如何使用 Firebase Cloud Functions 避免承诺嵌套?
- mongodb - 无法通过 kubectl port-forward 连接到 mongodb 副本集
- c# - 有没有办法指定如何解析 Newtonsoft.Json 中的某些属性?
- c# - Android Xamarin 检测从任务管理器开始