首页 > 解决方案 > 压缩分组的 IObservable

问题描述

public delegate void QuoteChangeEvent(IQuote q);

var priceChangedObservable = Observable.FromEvent<QuoteChangeEvent, IQuote> 
    (handler =>
    {
        QuoteChangeEvent qHandler = (e) =>
        {
            handler(e);
        };

        return qHandler;
    },
    qHandler => api.MAPI.OnQuoteChange += qHandler,
    qHandler => api.MAPI.OnQuoteChange -= qHandler);

    var grouped = priceChangedObservable
    .GroupBy(instrument => instrument.Symbol);

grouped也是_type IObservable<IGroupedObservable<string, IQuote>>

两个问题。

1)我试图

grouped.SortBy(instrument => instrument.Symbol);

SortBy似乎不存在?

2)假设有两个符号进入,GOOG 和 AAPL 进入grouped。如何使用Zip运算符,以便在我Subscribe是 a时得到什么Tuple<IQuote, IQuote>

我不能完全得到正确的语法。就像是:

Observable.Zip(?, ?, (a, b) => Tuple.Create(a, b))
    .Subscribe(tup => Console.WriteLine("Quotes: {0} {1}", tup.item1, tup.item2));

编辑 1

我几乎明白了:

    var first = grouped.Where(group => group.Key == "GOOG").FirstAsync();
    var second = grouped.Where(group => group.Key == "AAPL").FirstAsync();

    Observable.Zip(first, second, (a, b) => Tuple.Create(a, b))
    .Subscribe(tup => Console.WriteLine("Quotes: {0} {1}", tup.Item1, tup.Item2));

问题是tup不是类型<IQuote, IQuote>,而是类型:

Tuple<IGroupedObservable<string, IQuote>, IGroupedObservable<string, IQuote>>

标签: c#system.reactive

解决方案


这在你看来如何?

IObservable<(IQuote, IQuote)> results =
    grouped
        .Publish(gs =>
            gs
                .Where(g => g.Key == "GOOG")
                .SelectMany(g => g)
                .Zip(
                    gs
                        .Where(g => g.Key == "AAPL")
                        .SelectMany(g => g),
                    (x, y) => (x, y)));

推荐阅读