首页 > 解决方案 > 如何创建一个既是任务的类和一个 IObservable?

问题描述

最近我遇到了一种情况,将异步操作同时表示为 aTask<T>和 asIObservable<T>将是有利的。任务表示维护操作的状态(IsCompleted等),而可观察表示允许以有趣的方式( 、等)IsFaulted组合多个操作,自动处理取消订阅的任何操作的取消,解决了这个问题方式解决了触发后被遗忘的异步操作的问题。所以我对结合这两种表示的方法产生了兴趣。ConcatMergeSwitch

组合它们的简单且可能正确的方法是通过组合:创建一个在内部存储 aTask<T>和 an的类型IObservable<T>,并将它们作为它的两个属性公开。但是在这个问题中,我对既是aTask<T>a的类型的具有挑战性且可能不切实际的可能性感兴趣IObservable<T>。一种可以直接传递给接受任务或可观察的 API 的类型,并且在任何一种情况下都可以做正确的事情。所以它不能只是一个类似任务的对象。它必须继承自真实的东西,即Task<T>类本身。像这样的东西:

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
    {
        //...
    }
}

创建AsyncOperation实例应立即调用提供的操作。换句话说,一个AsyncOperation应该代表一个热门任务/可观察的组合。

是否可以创建这样的类型?

顺便说一句,这里是ReactiveX/RxJava库中的一个线程,它证明其他人之前已经考虑过这个问题:Observable 上没有“isCompleted”或“isErrored”方法

标签: c#task-parallel-librarysystem.reactiverx.net

解决方案


我找到了一种方法来创建一个继承自的可观察对象,Task方法是使用@GlennSlayden 在答案中描述的天才技术。

public class AsyncOperation<TResult> : Task<TResult>, IObservable<TResult>
{
    private readonly IObservable<TResult> _observable;
    private readonly Task<TResult> _promise;

    private AsyncOperation(Func<TResult> function) : base(() => function())
        => function = this.GetResult;

    private TResult GetResult() => _promise.GetAwaiter().GetResult();

    public AsyncOperation(Func<CancellationToken, Task<TResult>> action)
        : this((Func<TResult>)null)
    {
        _observable = Observable.StartAsync(action, Scheduler.Immediate);
        _promise = _observable.ToTask();
        _promise.ContinueWith(_ => base.RunSynchronously(), default,
            TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
    }

    IDisposable IObservable<TResult>.Subscribe(IObserver<TResult> observer)
        => _observable.Subscribe(observer);
}

上述解决方案并不完美,因为派生类的实例永远无法转换到Canceled状态。这是一个我不知道如何解决的问题,它可能无法解决,但它可能不是很重要。取消以 的形式出现TaskCanceledException,处理此异常是处理取消任务的正常方式。

有趣的是,可以通过创建一个虚拟订阅并处理它来取消异步操作:

var operation = new AsyncOperation<TResult>(async cancellationToken => { /* ... */ });

operation.Subscribe(_ => { }, _ => { }).Dispose(); // Cancels the cancellationToken

我对这门课做了一些实验,发现它不像我最初想象的那样实用。问题是存在许多同时支持任务和可观察对象的 API,并且在其他方​​面是相同的(例如ConcatMergeSwitchWait)。这导致编译时错误(CS0121模糊调用)的频繁出现。可以通过将类型转换为任务或可观察来解决歧义,但这很尴尬,并且首先否定了组合这两种类型的整个目的。


澄清:该行_promise.GetAwaiter().GetResult()乍一看可能表明此实现阻塞了一个ThreadPool线程。情况并非如此,因为底座Task最初是冷的,只有在_promise完成后才会变暖。


推荐阅读