首页 > 解决方案 > 使用 Rx .NET 从另一个 observable 通知异步主题

问题描述

我正在尝试通知Subject _sub从另一个可观察对象订阅的听众,然后在Do处理程序中记录一些消息。我正在调用 OnNext,如果_sub不是异步的,一切都会正常工作。这里的问题是没有 OnNextAsync 函数,我会在第一个 observable 中等待。做这个的最好方式是什么?

 class Program
        {
            private static Subject<int> _sub = new Subject<int>();

            static void Main(string[] args)
            {
                _sub.SelectMany(async _ =>
                {
                    Console.WriteLine("SUB START: " + _);
                    await Task.Delay(3000);
                    Console.WriteLine("SUB END: " + _);
                    return 1;
                }).Subscribe();


                Start();
            }

            public static void Start()
            {
                int count = 0;
                Observable.Interval(TimeSpan.FromSeconds(5)).Select(x =>
                {
                    Console.WriteLine("START INTERVAL");
                    _sub.OnNext(count++); //onNext is not awaitable
                    Console.WriteLine("END INTERVAL");
                    return 1;
                })
                .Do(_ => Console.WriteLine("ALL FINISHED"))
                .Subscribe();

                Console.WriteLine("READLINE");
                Console.ReadLine();
            }

        }

结果:

READLINE
START INTERVAL
SUB START: 0
END INTERVAL
ALL FINISHED
SUB END: 0

预期结果:

READLINE
START INTERVAL
SUB START: 0
SUB END: 0
END INTERVAL
ALL FINISHED

标签: c#system.reactiverx.net

解决方案


可观察对象不应依赖于其观察者的行为。

我建议你重新考虑整个事情。你正在做的事情看起来更具交互性而不是反应性。


推荐阅读