首页 > 解决方案 > 将带有 IO 操作的无限循环转换为可观察的

问题描述

您好,我正在尝试在Observable.This 任务中转换一个长时间运行的任务(我在每次迭代时从套接字读取) CancellationToken

一些模型

public class Model
{
   public string Data{get;set;}
}

长时间运行的任务

    CancellationTokenSource cts=new CancellationTokenSource();
    ClientWebSocket socket=await ConnectTheWebSocketAsync();
    Task t1=Task.Run(async()=>{
       while(true)
       {
          cts.Token.ThrowIfCancellationRequested();
          byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
          var rec = await socket.ReceiveAsync(rawInput, cts.Token);
          Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>();  //some wrapper for serialization
          ArrayPool<byte>.Shared.Return(rawInput);
          //i want to push this result into an observable
          //Could i use an Observable.Empty and put data in it here ?
         // myObs.OnNext(result);
       }
    },cts.Token);

可观察的尝试

我已经研究了创建这个 observable 的方法并且我偶然发现了FromAsync

 var obs = Observable.FromAsync((cts) => {
                byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
                var rec = await socket.ReceiveAsync(rawInput, cts.Token);
                Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>();  //some wrapper for serialization
                ArrayPool<byte>.Shared.Return(rawInput);
                return result;

            }).Repeat();

所以我的问题是:

Observable.FromAsync((token)=> [some io action]).Repeat();

鉴于上述模式:
- 我如何停止单个迭代(从套接字读取)
- 我如何停止整个循环(Repeat
- 令牌参数FromAsync来自哪里

标签: c#.net-coreioobservablesystem.reactive

解决方案


推荐阅读