首页 > 解决方案 > 如何将一个 IEnumerable 异步转换为另一个,就像 LINQ 的 Select() 一样,但在每个转换后的项目上都使用 await?

问题描述

考虑这种情况:

class Product { }

interface IWorker
{
    Task<Product> CreateProductAsync();
}

我现在得到一个IEnumerable<IWorker> workers并且应该从中创建一个IEnumerable<Product>我必须传递给我无法更改的其他函数:

void CheckProducts(IEnumerable<Product> products);

此方法需要访问整个IEnumerable<Product>. 不可能细分它并调用CheckProducts多个子集。

一个明显的解决方案是:

CheckProducts(workers.Select(worker => worker.CreateProductAsync().Result));

但这当然是阻塞的,因此这只是我最后的手段。从语法上讲,我正是需要这个,只是没有阻塞。

我不能await在传递给的函数内部使用,Select()因为我必须将其标记为async,这将要求它返回 aTask本身,而我将一无所获。最后我需要一个IEnumerable<Product>而不是一个IEnumerable<Task<Product>>.

重要的是要知道创造产品的工人的顺序确实很重要,他们的工作不能重叠。否则,我会这样做:

async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
    var tasks = workers.Select(worker => worker.CreateProductAsync());
    return await Task.WhenAll(tasks);
}

但不幸的是,在我需要它们按顺序Task.WhenAll()执行的同时并行执行一些任务。

IReadOnlyList<IWorker>如果我有一个而不是一个,这是一种实现它的可能性IEnumerable<IWorker>

async Task<IEnumerable<Product>> CreateProductsAsync(IReadOnlyList<IWorker> workers)
{
    var resultList = new Product[workers.Count];
    for (int i = 0; i < resultList.Length; ++i)
        resultList[i] = await workers[i].CreateProductAsync();
    return resultList;
}

但我必须处理一个IEnumerable,更糟糕的是,它通常非常庞大,有时甚至是无限的,永远产生工人。如果我知道它的大小合适,我会调用ToArray()它并使用上面的方法。

最终的解决方案是:

async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
    foreach (var worker in workers)
        yield return await worker.CreateProductAsync();
}

但是yieldawait不兼容,如本答案中所述。看看那个答案,这个假设对我有IAsyncEnumerator帮助吗?C# 中是否同时存在类似的东西?

我面临的问题的摘要:

我已经尝试过但不起作用的总结:

有人对我有解决方案或解决方法吗?否则我将不得不使用那个阻塞代码......

标签: c#asynchronousasync-awaitienumerable

解决方案


IEnumerator<T> 是一个同步接口,因此如果 CheckProducts 在下一个工作人员完成创建产品之前枚举下一个产品,则阻塞是不可避免的。

不过,您可以通过在另一个线程上创建产品、将它们添加到BlockingCollection<T>并在主线程上产生它们来实现并行性:

static IEnumerable<Product> CreateProducts(IEnumerable<IWorker> workers)
{
    var products = new BlockingCollection<Product>(3);

    Task.Run(async () => // On the thread pool...
    {
        foreach (IWorker worker in workers)
        {
            Product product = await worker.CreateProductAsync(); // Create products serially.
            products.Add(product); // Enqueue the product, blocking if the queue is full.
        }

        products.CompleteAdding(); // Notify GetConsumingEnumerable that we're done.
    });

    return products.GetConsumingEnumerable();
}

为了避免无限的内存消耗,您可以选择将队列的容量指定为 BlockingCollection<T> 的构造函数参数。我在上面的代码中使用了 3。


推荐阅读