c# - 将 IAsyncEnumerable 与 Dapper 一起使用
问题描述
我们最近将我们使用的 ASP.NET Core API 迁移Dapper
到 .NET Core 3.1。迁移后,我们觉得有机会为我们的一个端点使用最新IAsyncEnumerable
功能。C# 8
这是更改前的伪代码:
public async Task<IEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
foreach (var item in items)
{
yield return item;
}
}
}
IAsyncEnumerable
代码更改后:
// Import Nuget pacakage: System.Linq.Async
public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
var reader = await _connection.QueryMultipleAsync(getItemsSql,
param: new
{
Id = id
});
var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
if (idFromDb == null)
{
return null;
}
var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);
return Stream(reader, items);
}
private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
using (reader)
{
await foreach (var item in items.ToAsyncEnumerable())
{
yield return item;
}
}
}
上面的使用方法是从这篇文章ToAsyncEnumerable
中获得灵感的,但我不能 100% 确定我是否在正确的地方/上下文中使用它。
问题:
- dapper 库只返回
IEnumerable
,但我们可以用ToAsyncEnumerable
它把它转换成上面IAsyncEnumerable
的 forasync
stream
吗?
注意:这个问题看起来类似于如果与 async/await 一起使用,返回 IEnumerable 会发生什么(使用 Dapper 从 SQL Server 流式传输数据)?但我认为这不能回答我的问题。
解决方案
更新:当我第一次写这个答案时,我不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。有鉴于此,一种更简单的方法是可能的:
using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();
// Consider using reader.NextResultAsync(). Follow github issue for details:
while (await reader.ReadAsync()) {
yield return rowParser(reader);
}
参考:https ://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322
原答案:
这是IAsyncEnumerable
我写的一个包装器,它可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的力量的人:
public class ReaderParser<T> : IAsyncEnumerable<T> {
public ReaderParser(SqlDataReader reader) {
Reader = reader;
}
private SqlDataReader Reader { get; }
public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
return new ReaderParserEnumerator<T>(Reader);
}
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
public ReaderParserEnumerator(SqlDataReader reader) {
Reader = reader;
RowParser = reader.GetRowParser<T>();
}
public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
private SqlDataReader Reader { get; }
private Func<IDataReader, T> RowParser { get; }
public async ValueTask DisposeAsync() {
await Reader.DisposeAsync();
}
public async ValueTask<bool> MoveNextAsync() {
return await Reader.ReadAsync();
}
}
用法:
var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);
然后,包System.Linq.Async
基本上添加了所有IEnumerable
你知道和喜欢的很好的扩展,例如在我的使用中:
var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();
推荐阅读
- python - python 正在显示在 Windows 中键入 python -v 时要安装的东西的列表
- ios - 快速比较发布时间和当前时间
- msal - MSAL - 刷新令牌到期日期
- google-sheets - 如何避免使用公式或动态数据重写谷歌表格单元格?
- protocols - 如何确定 KNX 设备是否能够发送扩展帧?
- oauth-2.0 - Xero OAuth2 问题
- nic - 对于 XDP-eBPF 上的 ipv6-Srv6 数据包,rx_queue_index 始终返回值 1
- python - 在python中的格式化方法中使用大写方法
- python-3.x - 将所有嵌套的 xml InfoTables 转换为 Python 字典
- java - 如何从firebase数据库中以文本形式显示我的数据实时我的代码和我的XML在