首页 > 解决方案 > 使用 Rx,如何获取每个唯一键的最新值?

问题描述

如何获取每个唯一键的最新值?

在这个示例中,我有股票代码和价格。我如何组合它以输出具有最新价格的独特股票?

在现实生活中,这种事件流将持续数年,我只需要股票的当前价格。

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace LearnRx
{
    class Program
    {
        public class Stock
        {
            public string Symbol { get; set; }
            public float Price { get; set; }
            public DateTime ModifiedAt { get; set; }

            public override string ToString()
            {
                return "Stock: " + Symbol + " " + Price + " " + ModifiedAt;
            }
        }

        static void Main(string[] args)
        {
            var baseTime = new DateTime(2000, 1, 1, 1, 1, 1);
            var fiveMinutes = TimeSpan.FromMinutes(5);
            var stockEvents = new Stock[]
            {
                new Stock
                {
                    Symbol = "MSFT",
                    Price = 100,
                    ModifiedAt = baseTime
                },
                new Stock
                {
                    Symbol = "GOOG",
                    Price = 200,
                    ModifiedAt = baseTime + fiveMinutes
                },
                new Stock
                {
                    Symbol = "MSFT",
                    Price = 150,
                    ModifiedAt = baseTime + fiveMinutes + fiveMinutes
                },
                new Stock
                {
                    Symbol = "AAPL",
                    Price = 300,
                    ModifiedAt = baseTime + fiveMinutes + fiveMinutes + fiveMinutes
                },
            }.ToObservable();

            var scheduler = new HistoricalScheduler();
            var replay = Observable.Generate(
                stockEvents.GetEnumerator(),
                events => events.MoveNext(),
                events => events,
                events => events.Current,
                events => events.Current.ModifiedAt,
                scheduler);


            replay
                .Subscribe( i => Console.WriteLine($"Event: {i} happened at {scheduler.Now}"));

            scheduler.Start();
        }
    }
}

标签: system.reactive

解决方案


听起来您想要 aGroupBy后跟 aCombineLatest应该存在,但不存在。

这是缺少的CombineLatest

public static class X
{
    
    public static IObservable<ImmutableDictionary<TKey, TValue>> CombineLatest<TKey, TValue>(this IObservable<IGroupedObservable<TKey, TValue>> source)
    {
        return source
            .SelectMany(o => o.Materialize().Select(n => (Notification: n, Key: o.Key)))
            .Scan(default(Notification<ImmutableDictionary<TKey, TValue>>), (stateNotification, t) => {
                var state = stateNotification?.Value ?? ImmutableDictionary<TKey, TValue>.Empty;
                switch(t.Notification.Kind) {
                    case NotificationKind.OnError:
                        return Notification.CreateOnError<ImmutableDictionary<TKey, TValue>>(t.Notification.Exception);
                    case NotificationKind.OnCompleted:
                        return Notification.CreateOnNext(state.Remove(t.Key));
                    case NotificationKind.OnNext:
                        return Notification.CreateOnNext(state.SetItem(t.Key, t.Notification.Value));
                    default:
                        throw new NotImplementedException();
                }
            })
            .Dematerialize();
    }
}

然后是最后的查询Main

var prices = replay
    .GroupBy(s => s.Symbol)
    .CombineLatest()

这将为您Observable<Dictionary<string, Stock>>提供每个价格点。这应该使您能够在下游执行任何操作。


推荐阅读