c# - 如何处理异常并使用 Rx 重复
问题描述
我正在IO
使用Observable.FromAsync
.I 执行操作。我想永远重复此操作。我不明白的是如何处理异常,对它们做一些事情,然后回到我的循环中:
我试过的:
IObservable<string> ioObs=Observable.FromAsync<string>([something]); //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
onNext:x=> Console.Writeline($"Message:{x}"),
onCompleted: Console.Writeline("Completed"),
onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);
现在我不明白为什么在第一个异常之后我的可观察结束。我没有告诉它继续。
我想做的事:
- 执行 IO 动作
- 如果它抛出返回一些自定义值
- 重复循环
如果我的 observable 是可枚举的,我会想要这种行为:
public IAsyncEnumerable<string> EnumerableBehaviour()
{
while(true)
{
try
{
string data=await ReadAsync(); //the `FromAsync` delegate
yield return data;
}catch(Exception ex)
yield return "Error";
{
}
}
Repeat
即使OnError
被触发,我如何继续执行?
应该如何Observable.Catch
和Observable.Throw
结合Observable.Repeat
?
解决方案
当你有IObservable<string> ioObs = Observable.FromAsync<string>(Something);
一个 observable 时,你有一个可以返回一个值然后完成 ({OnNext}{OnCompleted}) 或者你有一个会抛出异常 ({OnError}) 的 observable。
在返回值或错误后允许源重复执行非常简单。
IObservable<string> query = ioObs.Retry().Repeat();
.Retry()
说“如果你得到一个错误再去”。.Repeat()
说“如果可观察完成,请再次订阅”。
现在这有点危险,因为您已经生成了一个可以连续执行的 observable。你需要找到一些方法来阻止它。
您的选择是:
- 处置订阅
- 取一定数量的值(即
.Take(n)
) - 设置一个
.Timeout
。 - 或使用
TakeUntil
当您的原件ioObs
在完成时返回 null 或空字符串时,最后一个很好。
你可以这样做:
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x == null);
这是一段测试代码,您可以尝试一下:
private int __counter = 0;
Task<string> Something()
{
return Task.Run(() =>
{
if (Interlocked.Increment(ref __counter) % 7 == 0)
{
throw new Exception("Blam!");
}
return $"Hello World {__counter}";
});
}
然后这样做:
IObservable<string> ioObs = Observable.FromAsync<string>(Something);
IObservable<string> query = ioObs.Retry().Repeat()
.TakeUntil(x => x.EndsWith("19"));
当我订阅时,我得到:
你好世界 1 你好世界2 你好世界3 你好世界4 你好世界5 你好世界 6 你好世界 8 你好世界 9 你好世界 10 你好世界 11 你好世界 12 你好世界 13 你好世界 15 你好世界 16 你好世界 17 你好世界 18 你好世界 19
请注意7
和14
丢失,因为那是引发异常的时间。
推荐阅读
- javascript - 我无法制作播放音乐的不和谐机器人
- c++ - 带有静态数组的 C++ 构造函数
- python - “公平”地排序元组列表
- java - 一直报错:Table visitortable has no column named contactperson
- java - 将 dispatch.yaml 添加到每个服务?
- java - BigInteger 的 Java 基础转换
- rust - 是否可以检查 rust 中的闭包是否相等?
- python - 当我使用 ETABS OAPI 时,我在最新版本的 anaconda 中遇到问题。我收到消息“无法打开文件 \\BufferFileIn::BufferFileIn()”
- python-3.x - 熊猫数据框按天分组并在固定时间找到超过值的第一个值
- sensors - 我想将传感器数据从 MyRIO 发送到 XBee3 Mesh