c# - 如何将一个 IEnumerable 异步转换为另一个,就像 LINQ 的 Select() 一样,但在每个转换后的项目上都使用 await?
问题描述
考虑这种情况:
class Product { }
interface IWorker
{
Task<Product> CreateProductAsync();
}
我现在得到一个IEnumerable<IWorker> workers
并且应该从中创建一个IEnumerable<Product>
我必须传递给我无法更改的其他函数:
void CheckProducts(IEnumerable<Product> products);
此方法需要访问整个IEnumerable<Product>
. 不可能细分它并调用CheckProducts
多个子集。
一个明显的解决方案是:
CheckProducts(workers.Select(worker => worker.CreateProductAsync().Result));
但这当然是阻塞的,因此这只是我最后的手段。从语法上讲,我正是需要这个,只是没有阻塞。
我不能await
在传递给的函数内部使用,Select()
因为我必须将其标记为async
,这将要求它返回 aTask
本身,而我将一无所获。最后我需要一个IEnumerable<Product>
而不是一个IEnumerable<Task<Product>>
.
重要的是要知道创造产品的工人的顺序确实很重要,他们的工作不能重叠。否则,我会这样做:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
var tasks = workers.Select(worker => worker.CreateProductAsync());
return await Task.WhenAll(tasks);
}
但不幸的是,在我需要它们按顺序Task.WhenAll()
执行的同时并行执行一些任务。
IReadOnlyList<IWorker>
如果我有一个而不是一个,这是一种实现它的可能性IEnumerable<IWorker>
:
async Task<IEnumerable<Product>> CreateProductsAsync(IReadOnlyList<IWorker> workers)
{
var resultList = new Product[workers.Count];
for (int i = 0; i < resultList.Length; ++i)
resultList[i] = await workers[i].CreateProductAsync();
return resultList;
}
但我必须处理一个IEnumerable
,更糟糕的是,它通常非常庞大,有时甚至是无限的,永远产生工人。如果我知道它的大小合适,我会调用ToArray()
它并使用上面的方法。
最终的解决方案是:
async Task<IEnumerable<Product>> CreateProductsAsync(IEnumerable<IWorker> workers)
{
foreach (var worker in workers)
yield return await worker.CreateProductAsync();
}
但是yield
和await
不兼容,如本答案中所述。看看那个答案,这个假设对我有IAsyncEnumerator
帮助吗?C# 中是否同时存在类似的东西?
我面临的问题的摘要:
- 我有一个潜在的无尽
IEnumerable<IWorker>
- 我想
CreateProductAsync()
按照它们进来的顺序异步调用它们中的每一个 - 最后我需要一个
IEnumerable<Product>
我已经尝试过但不起作用的总结:
- 我不能使用
Task.WhenAll()
,因为它并行执行任务。 - 我无法
ToArray()
在循环中手动使用和处理该数组,因为我的序列有时是无穷无尽的。 - 我无法使用
yield return
,因为它与await
.
有人对我有解决方案或解决方法吗?否则我将不得不使用那个阻塞代码......
解决方案
IEnumerator<T> 是一个同步接口,因此如果 CheckProducts 在下一个工作人员完成创建产品之前枚举下一个产品,则阻塞是不可避免的。
不过,您可以通过在另一个线程上创建产品、将它们添加到BlockingCollection<T>并在主线程上产生它们来实现并行性:
static IEnumerable<Product> CreateProducts(IEnumerable<IWorker> workers)
{
var products = new BlockingCollection<Product>(3);
Task.Run(async () => // On the thread pool...
{
foreach (IWorker worker in workers)
{
Product product = await worker.CreateProductAsync(); // Create products serially.
products.Add(product); // Enqueue the product, blocking if the queue is full.
}
products.CompleteAdding(); // Notify GetConsumingEnumerable that we're done.
});
return products.GetConsumingEnumerable();
}
为了避免无限的内存消耗,您可以选择将队列的容量指定为 BlockingCollection<T> 的构造函数参数。我在上面的代码中使用了 3。
推荐阅读
- objective-c - 为什么带有基于谓词的期望的 XCTest 这么慢?
- javascript - 无法在 Next 中传递道具
- r - 即使我在 R 中指定源的路径后也无法获取文件
- android - 带有-prod“启用”的android上的离子慢启动
- java - Android Java:将 else-if 转换为 switch
- python - 使用tensorflwo2.0,应该如何查看tensor的值
- mongodb - 处理字段名称中的空格
- python - 安装了一个 python 库,但是(从 IDLE 运行它)每当我尝试导入它时,它都会返回一个 ModuleNotFoundError
- tensorflow - TensorFlow 在第一个 epoch 中的未知步骤
- javascript - JavaScript 模块中的事件监听器?