首页 > 解决方案 > 为什么当订阅具有相同序列的不同可观察对象时,ReplySubject 具有不同的行为?

问题描述

Example1 通过 Observable.Interval 创建 observable:

    var observable1 = Observable.Interval(TimeSpan.FromSeconds(1)).Take(3); 
    var replaySubject1 = new ReplaySubject<long>(); 
    observable1.Subscribe(replaySubject1); // subscribe 
    replaySubject1.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
    replaySubject1.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
    replaySubject1.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));

输出如下:

first:0
second:0
third:0
first:1
second:1
third:1
first:2
second:2
third:2

Example2 通过 Observable.Create 创建 observable:

        var observable2 = Observable.Create<long>(observer =>
        {
            for (var i = 0; i <= 2; i++)
            {
                observer.OnNext(i);
                Thread.Sleep(1000);
            }
            observer.OnCompleted();
            return Disposable.Empty;
        }); 

        var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
        observable2.Subscribe(replaySubject2); // subscribe 
        replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
        replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));

输出如下:

first:0
first:1
first:2
second:0
second:1
second:2
third:0
third:1
third:2

假设示例将具有相同的输出,但是,我错了,为什么?

标签: c#.netsystem.reactive

解决方案


这是因为当您将 ReplaySubject 订阅到 Observable 时,Observable.Create 中的代码会同步执行。

试试这个异步版本:

var observable2 = Observable.Create<long>(async observer =>
{
    for (var i = 0; i <= 2; i++)
    {
        observer.OnNext(i);
        await Task.Delay(1000);
    }
    observer.OnCompleted();
    return Disposable.Empty;
});

var replaySubject2 = new ReplaySubject<long>(TimeSpan.FromMinutes(1));
observable2.Subscribe(replaySubject2); // subscribe 
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"first:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"second:{x}"));
replaySubject2.Subscribe(onNext: x => Console.WriteLine($"third:{x}"));

或者看看 SubscribeOn/ObserveOn。


推荐阅读