首页 > 解决方案 > System.Threading.Channels ReadAsync() 方法正在阻塞执行

问题描述

概述

我正在尝试IAsyncEnumerable<T>围绕接口编写一个包装器IObserver<T>。起初我使用 aBufferBlock<T>作为后备数据存储,但通过性能测试和研究发现它实际上是一个非常慢的类型,所以我决定System.Threading.Channels.Channel试一试。我的 BufferBlock 实现有一个与这个类似的问题,但这次我不知道如何解决它。

问题

如果我的方法尚未写入,我GetAsyncEnumerator()的循环会被await _channel.Reader.WaitToRead(token)调用阻塞。在不阻塞程序执行的情况下等待一个值在此上下文中可用的正确方法是什么?IObserver<T>.OnNext()_channel

执行

public sealed class ObserverAsyncEnumerableWrapper<T> : IAsyncEnumerable<T>, IObserver<T>, IDisposable
{
    private readonly IDisposable _unsubscriber;
    private readonly Channel<T> _channel = Channel.CreateUnbounded<T>();

    private bool _producerComplete;

    public ObserverAsyncEnumerableWrapper(IObservable<T> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public async void OnNext(T value)
    {
        Log.Logger.Verbose("Adding value to Channel.");
        await _channel.Writer.WriteAsync(value);
    }

    public void OnError(Exception error)
    {
        _channel.Writer.Complete(error);
    }

    public void OnCompleted()
    {
        _producerComplete = true;
    }

    public async IAsyncEnumerator<T> GetAsyncEnumerator([EnumeratorCancellation] CancellationToken token = new CancellationToken())
    {
        Log.Logger.Verbose("Starting async iteration...");
        while (await _channel.Reader.WaitToReadAsync(token) || !_producerComplete)
        {
            Log.Logger.Verbose("Reading...");
            while (_channel.Reader.TryRead(out var item))
            {
                Log.Logger.Verbose("Yielding item.");
                yield return item;
            }
            Log.Logger.Verbose("Awaiting more items.");
        }
        Log.Logger.Verbose("Iteration Complete.");
        _channel.Writer.Complete();
    }

    public void Dispose()
    {
        _channel.Writer.Complete();
        _unsubscriber?.Dispose();
    }
}

在此处输入图像描述

附加上下文

没关系,但在运行时IObservable<T>传递给构造函数的实例是CimAsyncResult从对 api 进行的异步调用返回的Microsoft.Management.Infrastructure。这些使用了我试图用花哨的新异步枚举模式包装的 Observer 设计模式。

编辑

更新了对调试器输出的日志记录,并OnNext()按照一位评论者的建议使我的方法异步/等待。你可以看到它永远不会进入while()循环。

标签: asynchronous.net-coreiasyncenumerable

解决方案


在调用堆栈的上方,我通过 GetAwaiter().GetResult() 方法同步调用异步方法。

是的,这是个问题

我这样做是因为有一次我想从构造函数中获取数据。我更改了该实现以使用 Task.Run() 执行调用,现在迭代器可以在两种实现中完美运行。

有比阻塞异步代码更好的解决方案。使用Task.Run是避免死锁的一种方法,但您最终仍会获得低于标准的用户体验(我假设您的用户体验是一个 UI 应用程序,因为有一个SynchronizationContext)。

如果使用异步枚举器加载数据进行显示,那么更合适的解决方案是(同步)将 UI 初始化为“正在加载...”状态,然后在异步加载数据时更新该状态。如果异步枚举器用于其他用途,您可能会在我的异步构造函数博客文章中找到一些合适的替代模式。


推荐阅读