c# - 将带有 IO 操作的无限循环转换为可观察的
问题描述
您好,我正在尝试在Observable
.This 任务中转换一个长时间运行的任务(我在每次迭代时从套接字读取) CancellationToken
。
一些模型
public class Model
{
public string Data{get;set;}
}
长时间运行的任务
CancellationTokenSource cts=new CancellationTokenSource();
ClientWebSocket socket=await ConnectTheWebSocketAsync();
Task t1=Task.Run(async()=>{
while(true)
{
cts.Token.ThrowIfCancellationRequested();
byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
var rec = await socket.ReceiveAsync(rawInput, cts.Token);
Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>(); //some wrapper for serialization
ArrayPool<byte>.Shared.Return(rawInput);
//i want to push this result into an observable
//Could i use an Observable.Empty and put data in it here ?
// myObs.OnNext(result);
}
},cts.Token);
可观察的尝试
我已经研究了创建这个 observable 的方法并且我偶然发现了FromAsync
:
var obs = Observable.FromAsync((cts) => {
byte[] rawInput = ArrayPool<byte>.Shared.Rent(bufferSize);
var rec = await socket.ReceiveAsync(rawInput, cts.Token);
Model result = rawInput.AsMemory().Slice(0, rec.Count).Decode<Model>(); //some wrapper for serialization
ArrayPool<byte>.Shared.Return(rawInput);
return result;
}).Repeat();
- .我不明白的是,如果我想让 observable 停止,我该怎么
CancellationToken
做Repeat
。 - .我已经看到它
FromAsync
也有一个重载CancellationToken
。我假设这个令牌将为每个元素传递,我也会将它注入套接字中ReceiveAsync
。这将使我能够控制ReceiveAsync
套接字的当前操作或正在构造的当前元素。但它不会停止Repeat
.
所以我的问题是:
Observable.FromAsync((token)=> [some io action]).Repeat();
鉴于上述模式:
- 我如何停止单个迭代(从套接字读取)
- 我如何停止整个循环(Repeat
)
- 令牌参数FromAsync
来自哪里
解决方案
推荐阅读
- javascript - 如何修复 NeDB 中的“无法插入未定义的键,它违反了唯一约束”错误?
- c# - 泛型类中的非泛型方法调用显示“类型无效”
- python - A&B 和 (A)&(B) 有什么区别?
- algorithm - 算法:T(N) =2^N
- verilog - 如何迭代传递给 SV 中的任务或函数的所有参数?
- javascript - 如何调整下拉菜单的高度
- sql - update 语句与 case 语句数据类型转换
- sql - 如何在选择查询的多列中获取函数输出
- powerbi - 通过 Azure 数据工厂刷新 Power BI 数据集
- java - 将 Parquet 转换为 avro 会引发 java.lang.IllegalArgumentException: INT96 not yet implemented 错误