首页 > 解决方案 > 使用内存流进行大量序列化时如何解决内存问题

问题描述

我有一个在 Azure 上运行的月度数据归档网络作业,它为大约 5000 个定期将数据发布到 Azure 表中的远程设备提取一个月的数据。检索数据后的基本方法是将其序列化为内存流,然后 GZIP 将此流压缩为另一个流,然后将生成的流上传到 Azure Datalake blob 存储。这是当前正在使用的代码:

public static async Task<String> ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data)
    {
        try
        {
            Console.WriteLine("Attempting to serialize, compress and upload data to data lake...");
            using (var uncompressedMs = new MemoryStream())
            using (var compressedMs = new MemoryStream())
            using (var streamWriter = new StreamWriter(uncompressedMs))
            {
                // Serialize data
                Console.WriteLine("Starting serialization...");
                var serializer = new JsonSerializer
                {
                    NullValueHandling = NullValueHandling.Ignore,
                    Formatting = Formatting.None
                };
                serializer.Serialize(streamWriter, data);
                uncompressedMs.Seek(0, SeekOrigin.Begin);
                Console.WriteLine("Serialization completed successfully.");

                Console.WriteLine("Starting data compression...");
                using (var compressionStream = new GZipStream(compressedMs, CompressionMode.Compress))
                {
                    // Compress data
                    await uncompressedMs.CopyToAsync(compressionStream);
                    await compressionStream.FlushAsync();
                    compressedMs.Seek(0, SeekOrigin.Begin);
                    Console.WriteLine($"Data compression successfully completed.  Compression rate achieved: {Math.Round(100 - (100.0 * compressedMs.Length / uncompressedMs.Length), 1)}%.");

                    // Save data to Data Lake
                    Console.WriteLine($"Starting data upload to {path + "/" + filenameWithoutExtension}.json.gz...");
                    var dataLakeServiceClient = InSysDataLake.BlobStorage.GetDataLakeServiceClient();
                    var dataLakeFileSystemClient = InSysDataLake.BlobStorage.GetFileSystemClient(dataLakeServiceClient, container);
                    await InSysDataLake.BlobStorage.UploadFileFromMemoryStreamAsync(dataLakeFileSystemClient, path, filenameWithoutExtension + ".json.gz", compressedMs);
                    Console.WriteLine("Data upload completed successfully.");
                    return "OK";
                }
            }
        }
        catch (Exception ex)
        {
            Trace.TraceError(Helper.GenerateErrorMessage("InSysDataProcessing.DataArchiving.ArchiveDataAsync", ex));
            return $"ERROR: {ex.StackTrace}";
        }
    }

UploadFileFromMemoryStream 方法是:

public static async Task UploadFileFromMemoryStreamAsync(DataLakeFileSystemClient fileSystemClient, String directoryName, String filename, MemoryStream contentMS)
        {

            DataLakeDirectoryClient directoryClient = fileSystemClient.GetDirectoryClient(directoryName);
            DataLakeFileClient fileClient = await directoryClient.CreateFileAsync(filename).ConfigureAwait(false);

            contentMS.Position = 0;
            var contentSize = contentMS.Length;
            await fileClient.AppendAsync(contentMS, offset: 0).ConfigureAwait(false);
            await fileClient.FlushAsync(position: contentSize).ConfigureAwait(false);
        }

压缩后的流最终大小约为 5-20MB,未压缩的序列化数据可以大 5-10 倍(高达 250Mb)。使用这种归档方法的过程是调用它来存储每个设备的月度数据,然后它聚合特定位置的设备的所有数据并将其归档。它在 5-10 小时内被调用 5-10000 次。

时不时地(但经常成为问题)我看到以下错误:

[07/30/2020 16:43:07 > 2ef8ea: INFO] Monthly location data for ... for 3/2019 FAILED to be archived: ERROR:    at System.IO.MemoryStream.set_Capacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at System.IO.MemoryStream.EnsureCapacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at System.IO.StreamWriter.Write(Char value)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.JsonTextWriter.WriteValueDelimiter()
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.JsonWriter.AutoComplete(JsonToken tokenBeingWritten)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.JsonTextWriter.WritePropertyName(String name, Boolean escape)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.Serialize(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at Newtonsoft.Json.JsonSerializer.SerializeInternal(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO]    at InSysDataProcessing.DataArchiving.ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data) in E:\InergySystems\GitHub\InSysCore\InSysDataManagement\DataArchiving.cs:line 60.

这仅发生在聚合的位置数据,即更大的数据集。看起来一段时间后内存变得过于碎片化。在调用归档功能的过程中,我添加了:

GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect();

每次调用存档器后,但这并没有帮助。我还尝试扩大运行 webjob 的应用服务计划(目前是 P3v2,16 GB RAM,但这并不比 7GB RAM 的计划好)。

在这一点上,我不确定下一步该尝试什么。

标签: .net.net-corejson.net

解决方案


感谢上述问题评论中提供的链接,我修改了代码,现在它更简单、更健壮。这是现在使用的修正方法,没有内存错误,即使对于 >450MB 的未压缩序列化数据也是如此:

const int BufferSize = 16384;

public static async Task<String> ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data)
            {
                Console.WriteLine("Attempting to serialize, compress and upload data to data lake...");

                try
                {
                    using (var ms = new MemoryStream())
                    {
                        using (var gzip = new GZipStream(ms, CompressionMode.Compress, true))
                        {
                            using (var writer = new StreamWriter(gzip, Encoding.UTF8, BufferSize))
                            {
                                var serializer = new JsonSerializer
                                {
                                    NullValueHandling = NullValueHandling.Ignore,
                                    Formatting = Formatting.None
                                };
                                serializer.Serialize(writer, data);
                            }
                        }

                        ms.Seek(0, SeekOrigin.Begin);

                        // Save data to Data Lake
                        Console.WriteLine($"Starting data upload to {path + "/" + filenameWithoutExtension}.json.gz...");
                        var dataLakeServiceClient = BlobStorage.GetDataLakeServiceClient();
                        var dataLakeFileSystemClient = BlobStorage.GetFileSystemClient(dataLakeServiceClient, container);
                        await BlobStorage.UploadFileFromMemoryStreamAsync(dataLakeFileSystemClient, path, filenameWithoutExtension + ".json.gz", ms);
                        Console.WriteLine("Data upload completed successfully.");
                        return "OK";
                    }
                }
                catch (Exception ex)
                {
                    Trace.TraceError(Helper.GenerateErrorMessage("InSysDataProcessing.DataArchiving.Json.ArchiveDataAsync", ex));
                    return $"ERROR: {ex.StackTrace}";
                }
            }

推荐阅读