c# - 如何创建一个既是任务的类和一个 IObservable?
问题描述
最近我遇到了一种情况,将异步操作同时表示为 aTask<T>
和 asIObservable<T>
将是有利的。任务表示维护操作的状态(IsCompleted
等),而可观察表示允许以有趣的方式( 、等)IsFaulted
组合多个操作,自动处理取消订阅的任何操作的取消,解决了这个问题方式解决了触发后被遗忘的异步操作的问题。所以我对结合这两种表示的方法产生了兴趣。Concat
Merge
Switch
组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 aTask<T>
和 an的类型IObservable<T>
,并将它们作为它的两个属性公开。但是在这个问题中,我对既是aTask<T>
又是a的类型的具有挑战性且可能不切实际的可能性感兴趣IObservable<T>
。一种可以直接传递给接受任务或可观察的 API 的类型,并且在任何一种情况下都可以做正确的事情。所以它不能只是一个类似任务的对象。它必须继承自真实的东西,即Task<T>
类本身。像这样的东西:
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
{
//...
}
}
创建AsyncOperation
实例应立即调用提供的操作。换句话说,一个AsyncOperation
应该代表一个热门任务/可观察的组合。
是否可以创建这样的类型?
顺便说一句,这里是ReactiveX/RxJava库中的一个线程,它证明其他人之前已经考虑过这个问题:Observable 上没有“isCompleted”或“isErrored”方法
解决方案
我找到了一种方法来创建一个继承自的可观察对象,Task
方法是使用@GlennSlayden 在此答案中描述的天才技术。
public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
private readonly IObservable<TResult> _observable;
private readonly Task<TResult> _promise;
private AsyncOperation(Func<TResult> function) : base(() => function())
=> function = this.GetResult;
private TResult GetResult() => _promise.GetAwaiter().GetResult();
public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
: this((Func<TResult>)null)
{
_observable = Observable.StartAsync(action, Scheduler.Immediate);
_promise = _observable.ToTask();
_promise.ContinueWith(_ => base.RunSynchronously(), default,
TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
}
IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
=> _observable.Subscribe(observer);
}
上述解决方案并不完美,因为派生类的实例永远无法转换到Canceled
状态。这是一个我不知道如何解决的问题,它可能无法解决,但它可能不是很重要。取消以 的形式出现TaskCanceledException
,处理此异常是处理取消任务的正常方式。
有趣的是,可以通过创建一个虚拟订阅并处理它来取消异步操作:
var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });
operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken
我对这门课做了一些实验,发现它不像我最初想象的那样实用。问题是存在许多同时支持任务和可观察对象的 API,并且在其他方面是相同的(例如Concat
、Merge
、Switch
等Wait
)。这导致编译时错误(CS0121模糊调用)的频繁出现。可以通过将类型转换为任务或可观察来解决歧义,但这很尴尬,并且首先否定了组合这两种类型的整个目的。
澄清:该行_promise.GetAwaiter().GetResult()
乍一看可能表明此实现阻塞了一个ThreadPool
线程。情况并非如此,因为底座Task
最初是冷的,只有在_promise
完成后才会变暖。
推荐阅读
- docker - 如何在 localhost 中查看 docker 镜像
- ios - 在 swift 5 中以编程方式更改比例约束
- ruby-on-rails - 添加唯一索引后rails rspec重复键值违反唯一约束
- postgresql - 列引用在 Postgresql 查询中是不明确的,即使子查询明确命名了该列
- python - 如何使用格式填充空格
- sql - sql:从每个组中删除最旧的n条记录
- javascript - 使用 SignalR 获取角色中的用户列表
- visual-studio-code - VS 代码编辑器.autoIndent
- python-3.x - pd.Series(pred).value_counts() 如何获取数据框中的第一列?
- r - 用 R-base 中的值标记的条形标题