首页 > 解决方案 > 如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息

问题描述

我看过LINQ 上与 IAsyncEnumerable的聊天,这让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于现实世界的应用程序来说不够详细,尤其是就我的经验水平而言,我理解示例/ IAsyncEnumerables 的文档目前还不存在

我正在尝试从文件中读取,对流进行一些转换,返回 a IAsyncEnumerable,然后在获得任意数量的对象后将这些对象发送到下游,例如:

await foreach (var data in ProcessBlob(downloadedFile))
{
    //todo add data to List<T> called listWithPreConfiguredNumberOfElements
    if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
        await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
        
    //repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}

到目前为止,我对此事的理解是,该await foreach行正在处理使用Tasks(或ValueTasks)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。

我正在使用该System.Linq.Async软件包,希望可以使用相关的扩展方法。我可以看到一些TakeWhile以 .

任何帮助或朝着正确的方向推动将不胜感激,谢谢。

标签: c#.netcollectionsiteratoriasyncenumerable

解决方案


在System.Interactive.AsyncBuffer包中有一个操作员可以满足您的需求。

// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
    this IAsyncEnumerable<TSource> source, int count);

这个包包含像Amb, Throw, Catch,等这样的操作符DeferFinally它们在 Linq 中没有直接的等价物,但它们在System.Reactive中有等价物。这是因为s 在概念上比sIAsyncEnumerable更接近s(因为两者都有时间维度,而s 是永恒的)。IObservableIEnumerableIEnumerable


推荐阅读