首页 > 解决方案 > 将 IAsyncEnumerable 与 Dapper 一起使用

问题描述

我们最近将我们使用的 ASP.NET Core API 迁移Dapper到 .NET Core 3.1。迁移后,我们觉得有机会为我们的一个端点使用最新IAsyncEnumerable功能。C# 8

这是更改前的伪代码:

public async Task<IEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
       return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
        foreach (var item in items)
        {
            yield return item;
        }
    }     
}

IAsyncEnumerable代码更改后:

// Import Nuget pacakage: System.Linq.Async

public async Task<IAsyncEnumerable<Item>> GetItems(int id)
{
    var reader = await _connection.QueryMultipleAsync(getItemsSql,
       param: new
       {
           Id = id
       });

    var idFromDb = (await reader.ReadAsync<int?>().ConfigureAwait(false)).SingleOrDefault();
    if (idFromDb == null)
    {
        return null;
    }

    var items = await reader.ReadAsync<Item>(buffered: false).ConfigureAwait(false);

    return Stream(reader, items);
} 

private IAsyncEnumerable<Item> Stream(SqlMapper.GridReader reader, IEnumerable<Item> items)
{
    using (reader)
    {
       await foreach (var item in items.ToAsyncEnumerable())
       {
           yield return item;
       }
    }
 }

上面的使用方法是从这篇文章ToAsyncEnumerable中获得灵感的,但我不能 100% 确定我是否在正确的地方/上下文中使用它。

问题:

注意:这个问题看起来类似于如果与 async/await 一起使用,返回 IEnumerable 会发生什么(使用 Dapper 从 SQL Server 流式传输数据)?但我认为这不能回答我的问题。

标签: c#async-awaitdapperiasyncenumerable

解决方案


更新:当我第一次写这个答案时,我不知道异步迭代器。感谢 Theodor Zoulias 指出这一点。有鉴于此,一种更简单的方法是可能的:

using var reader = await connection.ExecuteReaderAsync(query, parameters);
var rowParser = reader.GetRowParser<T>();

// Consider using reader.NextResultAsync(). Follow github issue for details:

while (await reader.ReadAsync()) {
    yield return rowParser(reader);
}

参考:https ://github.com/DapperLib/Dapper/issues/1239#issuecomment-1035507322

原答案:

这是IAsyncEnumerable我写的一个包装器,它可以帮助那些想要使用 async/await 流式传输无缓冲数据并且还想要 Dapper 类型映射的力量的人:

public class ReaderParser<T> : IAsyncEnumerable<T> {
    public ReaderParser(SqlDataReader reader) {
        Reader = reader;
    }
    private SqlDataReader Reader { get; }
    public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) {
        return new ReaderParserEnumerator<T>(Reader);
    }
}
public class ReaderParserEnumerator<T> : IAsyncEnumerator<T> {
    public ReaderParserEnumerator(SqlDataReader reader) {
        Reader = reader;
        RowParser = reader.GetRowParser<T>();
    }
    public T Current => Reader.FieldCount == 0 ? default(T) : RowParser(Reader);
    private SqlDataReader Reader { get; }
    private Func<IDataReader, T> RowParser { get; }
    public async ValueTask DisposeAsync() {
        await Reader.DisposeAsync();
    }
    public async ValueTask<bool> MoveNextAsync() {
        return await Reader.ReadAsync();
    }
}

用法:

var reader = await command.ExecuteReaderAsync();
return new ReaderParser<T>(reader);

然后,包System.Linq.Async基本上添加了所有IEnumerable你知道和喜欢的很好的扩展,例如在我的使用中:

var streamData = await repo.GetDataStream();
var buffer = await streamData.Take(BATCH_SIZE).ToListAsync();

推荐阅读