首页 > 解决方案 > 访问内存的底层数组

问题描述

在我的应用程序中,我需要遍历文件的内容以生成固定大小的文件块的哈希值。最终目标是实现 Amazon Glacier 的 Tree Hash 算法,我几乎逐字复制了他们文档中的代码。

但是,当我通过 SonarQube 运行以下代码时会出现问题:

    byte[] buff = new byte[Mio];
    int bytesRead;

    while ((bytesRead = await inputStream.ReadAsync(buff, 0, Mio)) > 0) {
        // Process the bytes read
    }

我有一个与while循环有关的 Roslyn 问题。问题是“更改 'ReadAsync' 方法调用以使用 'Stream.ReadAsync(Memory, CancellationToken)' 重载”。根据描述,这是因为使用 Memory 类的方法比使用基本数组的方法效率更高。

当类可以从头到尾使用时,这可能是正确的。问题是,我需要将数据提供给ComputeHasha 的方法HashAlgorithm,并且它们没有任何接受 a 的覆盖Memory。这意味着我必须使用复制数据ToArray的方法。这对我来说听起来不是很有效。Memory

Memory我知道可以通过将现有数组传递给其构造函数来创建实例,如下所示:

    byte[] buff = new byte[Mio];
    Memory<byte> memory = new Memory<byte>(buff);
    int bytesRead;

    while ((bytesRead = await inputStream.ReadAsync(memory)) > 0) {
        // Use `buff` to access the bytes
    }

但是,文档不清楚传递给构造函数的数组是否实际上用作Memory实例的底层存储。

因此,这是我的问题:

编辑添加有关代码如何工作的其他信息:这是AWS 计算 Glacier Tree Hash 示例的第一部分,该部分计算文件中 1Mio 块的第一个哈希值。

这些是while上面循环的内容:


// Constructor of the class
// The class implements IDisposable to properly dispose of the Algorithm field
// Constructor is called like this
// `using TreeHash treeHash = new TreeHash(System.Security.Cryptography.SHA512.Create());`
public TreeHash(HashAlgorithm algo) {
    this.Algorithm = algo;
}


// Chunk hash generation loop
// first part of the tree hash algorithm
    byte[][] chunkHashes = new byte[numChunks][];

    byte[] buff = new byte[Mio];
    int bytesRead;
    int idx = 0;

    while ((bytesRead = await inputStream.ReadAsync(buff, 0, Mio)) > 0) {
        chunkHashes[idx++] = this.ComputeHash(buff, bytesRead);
    }


// Quick wrapper around the hash algorithm
// Also used by the second part of the tree hash computation
private byte[] ComputeHash(byte[] data, int count) => this.Algorithm.ComputeHash(data, 0, count);

默认情况下,我使用无前缀版本的哈希算法,但我可能可以切换到托管版本。如果需要,该方法可以变为非async.

标签: c#arrayssonarqubebufferfilestream

解决方案


以下应该工作。它利用MemoryPool<byte>来获取IMemoryOwner<byte>我们可以用来检索暂存缓冲区的内容。我们需要 aMemory<byte>传递给ReadAsync调用,所以我们传递Memory.IMemoryOwner<byte>

然后我们重构代码以使用HashAlgorithm.TryComputeHash接受 aReadOnlySpan<byte>作为源和 aSpan<byte>作为目标的方法。我们确实分配了一个新数组(而不是使用ArrayPool),因为您正在保留/存储数组。

byte[][] chunkHashes = new byte[numChunks][]; 

using var memory = MemoryPool<byte>.Shared.Rent(Mio);

int bytesRead;
int idx = 0; 

while ((bytesRead = await inputStream.ReadAsync(memory.Memory, CancellationToken.None)) > 0) 
{ 
   var tempBuff = new byte[(int)Math.Ceiling(this.Algorithm.HashSize/8.0)];
   if (this.Algorithm.TryComputeHash(memory.Memory.Span[..bytesRead] /*1*/, tempBuff, out var hashWritten)) 
   {
      chunkHashes[idx++] = hashWritten == tempBuff.Length ? tempBuff : tempBuff[..hashWritten] /*2*/;
   } 
   else
      throw new Exception("buffer not big enough");
}

对于源,我们传递缓冲区的Span属性,Memory<bytes>该属性再次从IMemoryOwner<byte>.Memory属性中检索。我们根据读取的字节数将其切成适当的长度。我们作为目标传递的Span<byte>必须至少是算法HashSize属性的大小,即散列所需的位数而不是字节数)。由于实现可能(尽管我认为不太可能)使用不是8 的倍数的大小,因此我们将除法上限设置为在必要时四舍五入。我们不需要调用AsSpan,因为从T[].

我相信*最终写入的字节数将始终与HashSize. 如果/当它是,我们只需使用原始数组。否则,我们需要根据写入的哈希字节数将其切成正确的长度。

如果缓冲区不够大,则TryComputeHash返回false并抛出异常。我相当肯定这不会发生在我们身上,因为我们是根据 明确计算大小的HashSize,但无论如何我们都会将这种情况作为最佳实践来处理。

我已经通过了CancellationToken.None,但您可以提供自己的令牌。我也使用Range语法而不是显式调用Slice. 如果您无法使用它,或者您只是不喜欢它的外观,您可以明确说明:

/*1*/ memory.Memory.Span.Slice(0, bytesRead)
/*2*/ tempBuff.AsSpan(0, hashWritten).ToArray()

我们可以做出一些可能的假设:

  • 假设HashSize总是 8 的倍数
  • 假设HashSize始终等于写入的字节数,并且不对最终数组进行切片
  • 假设我们总是提供一个足够大的缓冲区(根据上面将是所需的确切大小)并删除ifException
while ((bytesRead = await inputStream.ReadAsync(memory.Memory, CancellationToken.None)) > 0)
{
   var tempBuff = new byte[this.Algorithm.HashSize/8];
   _ = this.Algorithm.TryComputeHash(memory.Memory.Span[..bytesRead], tempBuff, out _);
   chunkHashes[idx++] = tempBuff;
} 

* 不幸的是,我不能 100% 肯定地说这些是可以做出的有效假设。我看过源代码的大多数实现都有一个Debug.Assert验证缓冲区大小和写入的字节是相同的,所以我认为它们是合理的。也就是说,我认为我个人会坚持使用更详细的选项。

您还会注意到我已经删除了您的ComputeHash功能。这并不是说您仍然不能使用它,但我将把它转换为Try基于这种的Memory<>模式作为练习留给读者。


推荐阅读