首页 > 解决方案 > 压缩超过 2 个可观察序列的问题

问题描述

我正在寻找为什么Zip操作员不能使用超过 2 个可观察流的信息:

var stream1 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream2 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream3 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream4 = Observable.Create<int>(o =>
        {
            o.OnNext(1);
            o.OnCompleted();
            return Disposable.Empty;
        });

        var stream6 = stream1.Zip(stream2, stream3, stream4, (a, b, c, d) =>
        {
            return a + b + c + d;
        });

        var i = stream6.ToTask().GetAwaiter().GetResult();
        Console.WriteLine(i);
        Console.ReadKey();

我收到的错误是:

序列不包含任何元素

Zip我只压缩两个序列时,操作员工作正常。

下面我粘贴了堆栈跟踪:

   in System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   in System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   in System.Runtime.CompilerServices.TaskAwaiter`1.GetResult()
   in ConsoleApp1.Program.Main(String[] args) w Path\Program.cs:line 58

标签: c#.net.net-coresystem.reactive

解决方案


在我看来,它就像一个错误,或者充其量是粗制滥造的设计。为了让事情更简单:

var stream = Observable.Return(1);

var result2 = await stream.Zip(stream, (a, b) => (a, b));
var result3 = await stream.Zip(stream, stream, (a, b, c) => (a, b, c));

Console.WriteLine($"result2 = {result2}");
Console.WriteLine($"result3 = {result3}");

result2有效,因为 zip 会产生一个值。可result3观察对象没有产生值,因此等待失败。但是,它应该产生一个值。这是有关该重载的文档:

每当所有可观察序列都在相应索引处产生元素时,使用选择器函数将指定的可观察序列合并为一个可观察序列。

由于它们都在索引 0 处产生了一个值,因此您应该会看到一个值。所以...错误。

有趣的是,如果您重新定义stream如下:

var stream = Observable.Return(1).Delay(TimeSpan.FromMilliseconds(15));

...然后两者都起作用。错误可能与某些竞争条件有关。

我认为成对函数(有 2 个 observables)比 n-wise 重载更老,测试得更好。


推荐阅读