首页 > 解决方案 > 从 Azure blob 读取 Parquet 文件,无需在本地下载它 c# .net

问题描述

我们有一个 parquet 格式文件 (500 mb),它位于 Azure blob 中。如何直接从 blob 读取文件并保存在 c# 的内存中,例如:Datatable。

我可以使用以下代码读取实际位于文件夹中的镶木地板文件。

public void ReadParqueFile()
    {
         using (Stream fileStream = System.IO.File.OpenRead("D:/../userdata1.parquet"))     
        {
            using (var parquetReader = new ParquetReader(fileStream))
            {
                DataField[] dataFields = parquetReader.Schema.GetDataFields();

                for (int i = 0; i < parquetReader.RowGroupCount; i++)
                {

                    using (ParquetRowGroupReader groupReader = parquetReader.OpenRowGroupReader(i))
                    {
                        DataColumn[] columns = dataFields.Select(groupReader.ReadColumn).ToArray();

                        DataColumn firstColumn = columns[0];

                        Array data = firstColumn.Data;
                        //int[] ids = (int[])data;
                    }
                }
           }
        }

    }
}

(我可以使用 sourcestream 直接从 blob 读取 csv 文件)。请建议一种最快的方法来直接从 blob 读取镶木地板文件

标签: c#azureblobparquet

解决方案


根据我的经验,直接从 blob 读取 parquet 文件的解决方案是首先使用 sas 令牌生成 blob url,然后使用 sasHttpClient从 url 获取流,最后通过ParquetReader.

首先,请参考Create a service SAS for a blob官方文档Create a service SAS for a container or blob with .NET中使用 Azure Blob Storage SDK for .NET Core 部分的示例代码。

private static string GetBlobSasUri(CloudBlobContainer container, string blobName, string policyName = null)
{
    string sasBlobToken;

    // Get a reference to a blob within the container.
    // Note that the blob may not exist yet, but a SAS can still be created for it.
    CloudBlockBlob blob = container.GetBlockBlobReference(blobName);

    if (policyName == null)
    {
        // Create a new access policy and define its constraints.
        // Note that the SharedAccessBlobPolicy class is used both to define the parameters of an ad hoc SAS, and
        // to construct a shared access policy that is saved to the container's shared access policies.
        SharedAccessBlobPolicy adHocSAS = new SharedAccessBlobPolicy()
        {
            // When the start time for the SAS is omitted, the start time is assumed to be the time when the storage service receives the request.
            // Omitting the start time for a SAS that is effective immediately helps to avoid clock skew.
            SharedAccessExpiryTime = DateTime.UtcNow.AddHours(24),
            Permissions = SharedAccessBlobPermissions.Read | SharedAccessBlobPermissions.Write | SharedAccessBlobPermissions.Create
        };

        // Generate the shared access signature on the blob, setting the constraints directly on the signature.
        sasBlobToken = blob.GetSharedAccessSignature(adHocSAS);

        Console.WriteLine("SAS for blob (ad hoc): {0}", sasBlobToken);
        Console.WriteLine();
    }
    else
    {
        // Generate the shared access signature on the blob. In this case, all of the constraints for the
        // shared access signature are specified on the container's stored access policy.
        sasBlobToken = blob.GetSharedAccessSignature(null, policyName);

        Console.WriteLine("SAS for blob (stored access policy): {0}", sasBlobToken);
        Console.WriteLine();
    }

    // Return the URI string for the container, including the SAS token.
    return blob.Uri + sasBlobToken;
}

HttpClient然后使用 sas token 从 url获取 http 响应流。

var blobUrlWithSAS = GetBlobSasUri(container, blobName);
var client = new HttpClient();
var stream = await client.GetStreamAsync(blobUrlWithSAS);

最后通过阅读ParquetReader,代码来自Reading DataGitHub repo aloneguid/parquet-dotnet

var options = new ParquetOptions { TreatByteArrayAsString = true };
var reader = new ParquetReader(stream, options);

推荐阅读