首页 > 解决方案 > Rx.Net - 订阅 Cold Observable 时,发布方法缺少前几项

问题描述

Akavache的启发,我正在尝试创建一个为我提供IObservable<IArticle>. 该方法本质上首先尝试获取数据库中存在的所有文章,然后尝试从 Web 服务获取更新的文章,并在从 Web 服务获取最新文章时尝试将它们保存回数据库。

由于 web 服务本质上是一个冷的 observable 并且我不想订阅两次,所以我曾经Publish连接到它。我的理解是我使用的是该Publish方法的正确版本,但是,很多时候该方法往往会错过GetNewsArticles. 这是通过 UI 以及Trace在下面的调用中添加的调用观察到的。

除了解决问题之外,了解如何调试/测试此代码(除了将 DI 引入到 inject 之外NewsService)也很棒。

public IObservable<IArticle> GetContents(string newsUrl, IScheduler scheduler)
{
    var newsService = new NewsService(new HttpClient());
    scheduler = scheduler ?? TaskPoolScheduler.Default;

    var fetchObject = newsService
        .GetNewsArticles(newsUrl,scheduler)
        .Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));

    return fetchObject.Publish(fetchSubject =>
    {
        var updateObs = fetchSubject
            .Do( x =>                         
            {
                // Save to database, all sync calls
            })
            .Where(x => false)
            .Catch(Observable.Empty<Article>());

        var dbArticleObs = Observable.Create<IArticle>(o =>
        {
            return scheduler.ScheduleAsync(async (ctrl, ct) =>
            {
                using (var session = dataBase.GetSession())
                {
                    var articles = await session.GetArticlesAsync(newsUrl, ct);
                    foreach (var article in articles)
                    {
                        o.OnNext(article);
                    }
                }
                o.OnCompleted();
            });
        });

        return
            dbArticleObs                // First get all the articles from dataBase cache
                .Concat(fetchSubject    // Get the latest articles from web service 
                    .Catch(Observable.Empty<Article>())
                    .Merge(updateObs))  // Update the database with latest articles
                .Do(x => Trace.WriteLine($"Displaying {x.Title}"));
    });
}

更新 - 添加了 GetArticles

public IObservable<IContent> GetArticles(string feedUrl, IScheduler scheduler)
{
    return Observable.Create<IContent>(o =>
    {
        scheduler = scheduler ?? DefaultScheduler.Instance;
        scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            try
            {
                using (var inputStream = await Client.GetStreamAsync(feedUrl))
                {
                    var settings = new XmlReaderSettings
                    {
                        IgnoreComments = true,
                        IgnoreProcessingInstructions = true,
                        IgnoreWhitespace = true,
                        Async = true
                    };

                    //var parsingState = ParsingState.Channel;
                    Article article = null;
                    Feed feed = null;

                    using (var reader = XmlReader.Create(inputStream, settings))
                    {
                        while (await reader.ReadAsync())
                        {
                            ct.ThrowIfCancellationRequested();
                            if (reader.IsStartElement())
                            {
                                switch (reader.LocalName)
                                {
                                    ...
                                    // parsing logic goes here
                                    ...
                                }
                            }
                            else if (reader.LocalName == "item" &&
                                     reader.NodeType == XmlNodeType.EndElement)
                            {
                                o.OnNext(article);
                            }
                        }
                    }

                    o.OnCompleted();
                }
            }
            catch (Exception e)
            {
                o.OnError(e);
            }

        });
        return Disposable.Empty;
    });
}
更新 2

在此处共享源代码的链接。

标签: system.reactive

解决方案


关于您的代码,我有一些不喜欢的地方。我假设NewsService是一个IDisposable,因为它需要一个HttpClient(这是一次性的)。你没有进行适当的清理。

此外,您还没有提供完整的方法 - 因为您已经尝试将其缩减为问题 - 但这使得很难推断如何重写代码。

也就是说,让我觉得看起来很可怕的一件事是Observable.Create. 你能试试这个代码,看看它是否对你有用吗?

    var dbArticleObs =
        Observable
            .Using(
                () => dataBase.GetSession(),
                session =>
                    from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl, ct))
                    from article in articles
                    select article);

现在,如果确实如此,请尝试fetchObjectObservable.Using更新 `NewService.

无论如何,如果您可以提供完整的实现GetContentsNewsService以及您dataBase的问题中的代码,那将是很好的。


推荐阅读