首页 > 解决方案 > 如何处理异常并使用 Rx 重复

问题描述

我正在IO使用Observable.FromAsync.I 执行操作。我想永远重复此操作。我不明白的是如何处理异常,对它们做一些事情,然后回到我的循环中:

我试过的:

IObservable<string> ioObs=Observable.FromAsync<string>([something]);  //at each iteration i do an io operation (reading from a socket);
IObservable<string> loop=Observable.Catch(ioObs).Repeat();
loop.Subscribe(
  onNext:x=> Console.Writeline($"Message:{x}"),
  onCompleted: Console.Writeline("Completed"),
  onError: ex=>Console.Writeline($"Error\tReason:{ex.Message}")
);

现在我不明白为什么在第一个异常之后我的可观察结束。我没有告诉它继续。

我想做的事:

如果我的 observable 是可枚举的,我会想要这种行为:

public IAsyncEnumerable<string> EnumerableBehaviour()
{
   while(true)
   {
      try
      {
        string data=await ReadAsync();  //the `FromAsync` delegate
        yield return data;
      }catch(Exception ex)
          yield return "Error";
      {
   }
}

Repeat即使OnError被触发,我如何继续执行?

应该如何Observable.CatchObservable.Throw结合Observable.Repeat

标签: c#observablesystem.reactive

解决方案


当你有IObservable<string> ioObs = Observable.FromAsync<string>(Something);一个 observable 时,你有一个可以返回一个值然后完成 ({OnNext}{OnCompleted}) 或者你有一个会抛出异常 ({OnError}) 的 observable。

在返回值或错误后允许源重复执行非常简单。

IObservable<string> query = ioObs.Retry().Repeat();

.Retry()说“如果你得到一个错误再去”。.Repeat()说“如果可观察完成,请再次订阅”。

现在这有点危险,因为您已经生成了一个可以连续执行的 observable。你需要找到一些方法来阻止它。

您的选择是:

  • 处置订阅
  • 取一定数量的值(即.Take(n)
  • 设置一个.Timeout
  • 或使用TakeUntil

当您的原件ioObs在完成时返回 null 或空字符串时,最后一个很好。

你可以这样做:

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x == null);
        

这是一段测试代码,您可以尝试一下:

private int __counter = 0;
Task<string> Something()
{
    return Task.Run(() =>
    {
        if (Interlocked.Increment(ref __counter) % 7 == 0)
        {
            throw new Exception("Blam!");
        }
        return $"Hello World {__counter}";
    });
}

然后这样做:

IObservable<string> ioObs = Observable.FromAsync<string>(Something);

IObservable<string> query = ioObs.Retry().Repeat()
        .TakeUntil(x => x.EndsWith("19"));
        

当我订阅时,我得到:

你好世界 1
你好世界2
你好世界3
你好世界4
你好世界5
你好世界 6
你好世界 8
你好世界 9
你好世界 10
你好世界 11
你好世界 12
你好世界 13
你好世界 15
你好世界 16
你好世界 17
你好世界 18
你好世界 19

请注意714丢失,因为那是引发异常的时间。


推荐阅读