system.reactive - Rx.Net - 订阅 Cold Observable 时,发布方法缺少前几项
问题描述
受Akavache的启发,我正在尝试创建一个为我提供IObservable<IArticle>
. 该方法本质上首先尝试获取数据库中存在的所有文章,然后尝试从 Web 服务获取更新的文章,并在从 Web 服务获取最新文章时尝试将它们保存回数据库。
由于 web 服务本质上是一个冷的 observable 并且我不想订阅两次,所以我曾经Publish
连接到它。我的理解是我使用的是该Publish
方法的正确版本,但是,很多时候该方法往往会错过GetNewsArticles
. 这是通过 UI 以及Trace
在下面的调用中添加的调用观察到的。
除了解决问题之外,了解如何调试/测试此代码(除了将 DI 引入到 inject 之外NewsService
)也很棒。
public IObservable<IArticle> GetContents(string newsUrl, IScheduler scheduler)
{
var newsService = new NewsService(new HttpClient());
scheduler = scheduler ?? TaskPoolScheduler.Default;
var fetchObject = newsService
.GetNewsArticles(newsUrl,scheduler)
.Do(x => Trace.WriteLine($"Parsing Articles {x.Title}"));
return fetchObject.Publish(fetchSubject =>
{
var updateObs = fetchSubject
.Do( x =>
{
// Save to database, all sync calls
})
.Where(x => false)
.Catch(Observable.Empty<Article>());
var dbArticleObs = Observable.Create<IArticle>(o =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
using (var session = dataBase.GetSession())
{
var articles = await session.GetArticlesAsync(newsUrl, ct);
foreach (var article in articles)
{
o.OnNext(article);
}
}
o.OnCompleted();
});
});
return
dbArticleObs // First get all the articles from dataBase cache
.Concat(fetchSubject // Get the latest articles from web service
.Catch(Observable.Empty<Article>())
.Merge(updateObs)) // Update the database with latest articles
.Do(x => Trace.WriteLine($"Displaying {x.Title}"));
});
}
更新 - 添加了 GetArticles
public IObservable<IContent> GetArticles(string feedUrl, IScheduler scheduler)
{
return Observable.Create<IContent>(o =>
{
scheduler = scheduler ?? DefaultScheduler.Instance;
scheduler.ScheduleAsync(async (ctrl, ct) =>
{
try
{
using (var inputStream = await Client.GetStreamAsync(feedUrl))
{
var settings = new XmlReaderSettings
{
IgnoreComments = true,
IgnoreProcessingInstructions = true,
IgnoreWhitespace = true,
Async = true
};
//var parsingState = ParsingState.Channel;
Article article = null;
Feed feed = null;
using (var reader = XmlReader.Create(inputStream, settings))
{
while (await reader.ReadAsync())
{
ct.ThrowIfCancellationRequested();
if (reader.IsStartElement())
{
switch (reader.LocalName)
{
...
// parsing logic goes here
...
}
}
else if (reader.LocalName == "item" &&
reader.NodeType == XmlNodeType.EndElement)
{
o.OnNext(article);
}
}
}
o.OnCompleted();
}
}
catch (Exception e)
{
o.OnError(e);
}
});
return Disposable.Empty;
});
}
更新 2
在此处共享源代码的链接。
解决方案
关于您的代码,我有一些不喜欢的地方。我假设NewsService
是一个IDisposable
,因为它需要一个HttpClient
(这是一次性的)。你没有进行适当的清理。
此外,您还没有提供完整的方法 - 因为您已经尝试将其缩减为问题 - 但这使得很难推断如何重写代码。
也就是说,让我觉得看起来很可怕的一件事是Observable.Create
. 你能试试这个代码,看看它是否对你有用吗?
var dbArticleObs =
Observable
.Using(
() => dataBase.GetSession(),
session =>
from articles in Observable.FromAsync(ct => session.GetArticlesAsync(newsUrl, ct))
from article in articles
select article);
现在,如果确实如此,请尝试fetchObject
在Observable.Using
更新 `NewService.
无论如何,如果您可以提供完整的实现GetContents
,NewsService
以及您dataBase
的问题中的代码,那将是很好的。
推荐阅读
- java - Selenium 截图性能
- ruby-on-rails - 为什么当代码在 Ruby on Rails 的断点处运行时,VSCode 远程调试会话不参与?
- html - 条形图减少条形之间的空间
- c# - 为什么不是列表中不包含未从列表中删除的字符串的所有项目?
- python - Python:尝试从文件导入模块时出现“ModuleNotFoundError”
- android-studio - 有什么方法可以为微调器添加边框?
- gitlab - 使用 Gitlab API 创建管道。获得 500
- microsoft-teams - MS Teams 中的自适应卡片模板
- angular - 用于以 xlsx 格式上传文件的 Angular 单元测试用例
- python - 从 azure rest api 上传证书失败