c# - 如何安全地迭代 IAsyncEnumerable 以向下游发送集合以批量处理消息
问题描述
我看过LINQ 上与 IAsyncEnumerable的聊天,这让我对处理 IAsyncEnumerables 的扩展方法有了一些了解,但坦率地说,对于现实世界的应用程序来说不够详细,尤其是就我的经验水平而言,我理解示例/ IAsyncEnumerable
s 的文档目前还不存在
我正在尝试从文件中读取,对流进行一些转换,返回 a IAsyncEnumerable
,然后在获得任意数量的对象后将这些对象发送到下游,例如:
await foreach (var data in ProcessBlob(downloadedFile))
{
//todo add data to List<T> called listWithPreConfiguredNumberOfElements
if (listWithPreConfiguredNumberOfElements.Count == preConfiguredNumber)
await _messageHandler.Handle(listWithPreConfiguredNumberOfElements);
//repeat the behaviour till all the elements in the IAsyncEnumerable returned by ProcessBlob are sent downstream to the _messageHandler.
}
到目前为止,我对此事的理解是,该await foreach
行正在处理使用Task
s(或ValueTask
s)的数据,因此我们没有预先计算。我也对使用 List 变量犹豫不决,只是对其进行长度检查,因为跨线程共享该数据似乎不是线程安全的。
我正在使用该System.Linq.Async
软件包,希望可以使用相关的扩展方法。我可以看到一些TakeWhile
以 .
任何帮助或朝着正确的方向推动将不胜感激,谢谢。
解决方案
在System.Interactive.AsyncBuffer
包中有一个操作员可以满足您的需求。
// Projects each element of an async-enumerable sequence into consecutive
// non-overlapping buffers which are produced based on element count information.
public static IAsyncEnumerable<IList<TSource>> Buffer<TSource>(
this IAsyncEnumerable<TSource> source, int count);
这个包包含像Amb
, Throw
, Catch
,等这样的操作符Defer
,Finally
它们在 Linq 中没有直接的等价物,但它们在System.Reactive中有等价物。这是因为s 在概念上比sIAsyncEnumerable
更接近s(因为两者都有时间维度,而s 是永恒的)。IObservable
IEnumerable
IEnumerable
推荐阅读
- ios - 为什么归一化对象检测结果中存在负坐标?(CoreML、Vision、Swift、Ios)
- django - 聚合以及额外的值
- paypal - Paypal REST API - 检测到重复的发票 ID
- asp.net-web-api-filters - web api 操作过滤器不调用,为什么?
- sql - 查找 SQL Server 2008 中用户的日期范围之间的差距
- python - 更改列表中列的大小写
- wordpress - 将项目从本地计算机上传到 Azure DevOps 时遇到问题
- angular - Angular 6 - 选择名称时显示动态div
- python - 如何从 .txt 文件中绘制图形(y 轴设置)?
- angular - 自定义、用户特定文档字段的 Firestore 索引