首页 > 解决方案 > 无法完成可观察

问题描述

我编写了使用特定状态选项的异步文件阅读器。当所有文件都被读取时,我需要得到通知,但是这个可观察对象“阅读器”永远不会完成。(在“reader.Wait()”运算符之后无法获得“完成”通知)。你能帮我理解为什么吗?如何手动完成?

class AsyncReader
    {
        public enum States { Processing, Stopped, Paused};

        private Subject<string[]> filesProvider = new Subject<string[]>();

        private Subject<States> state = new Subject<States>();

        public void Run()
        {
            state.OnNext(States.Processing);
        }

        public IObservable<KeyValuePair<string, string>> GetDataSource()
        {
            return filesProvider.Select(files => ReadFiles(files, state)).Switch();
        }

        public AsyncReader(string[] args)
        {
            var reader = GetDataSource();
            Observable.Start(() =>
            {
                reader.Wait();
                Console.WriteLine("Done");
            });
            reader.Subscribe(line =>
            {
                Console.WriteLine(line);
            });
            filesProvider.OnNext(args);
        }

        public static IObservable<KeyValuePair<string, string>> ReadFile(string filePath, IObservable<States> rState) =>      
            rState.Where(state => state == States.Processing)
            .SelectMany(_ =>
            Observable
            .Using(
                () => new StreamReader(filePath),
                reader =>
                    Observable
                        .Defer(
                            () =>
                                Observable
                                    .FromAsync(reader.ReadLineAsync))
                        .Repeat()
                        .TakeWhile(line => line != null)
                        .Select(line => new KeyValuePair<string, string>(filePath, line))));

        public static IObservable<KeyValuePair<string, string>> ReadFiles(string[] files, IObservable<States> readState)
        {
            IObservable<KeyValuePair<string, string>> dataSource = Observable.Empty<KeyValuePair<string, string>>();
            foreach (var file in files)
            {
                dataSource = dataSource.Concat(ReadFile(file, readState));
            }
            return dataSource;
        }
    }

使用的简短示例:

class Program
    {
        static void Main(string[] args)
        {
            AsyncReader reader = new AsyncReader(args);
            reader.Run();
            Console.ReadKey();
        }
    }

标签: c#observablesystem.reactivereactiveui

解决方案


推荐阅读