.net - 使用内存流进行大量序列化时如何解决内存问题
问题描述
我有一个在 Azure 上运行的月度数据归档网络作业,它为大约 5000 个定期将数据发布到 Azure 表中的远程设备提取一个月的数据。检索数据后的基本方法是将其序列化为内存流,然后 GZIP 将此流压缩为另一个流,然后将生成的流上传到 Azure Datalake blob 存储。这是当前正在使用的代码:
public static async Task<String> ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data)
{
try
{
Console.WriteLine("Attempting to serialize, compress and upload data to data lake...");
using (var uncompressedMs = new MemoryStream())
using (var compressedMs = new MemoryStream())
using (var streamWriter = new StreamWriter(uncompressedMs))
{
// Serialize data
Console.WriteLine("Starting serialization...");
var serializer = new JsonSerializer
{
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.None
};
serializer.Serialize(streamWriter, data);
uncompressedMs.Seek(0, SeekOrigin.Begin);
Console.WriteLine("Serialization completed successfully.");
Console.WriteLine("Starting data compression...");
using (var compressionStream = new GZipStream(compressedMs, CompressionMode.Compress))
{
// Compress data
await uncompressedMs.CopyToAsync(compressionStream);
await compressionStream.FlushAsync();
compressedMs.Seek(0, SeekOrigin.Begin);
Console.WriteLine($"Data compression successfully completed. Compression rate achieved: {Math.Round(100 - (100.0 * compressedMs.Length / uncompressedMs.Length), 1)}%.");
// Save data to Data Lake
Console.WriteLine($"Starting data upload to {path + "/" + filenameWithoutExtension}.json.gz...");
var dataLakeServiceClient = InSysDataLake.BlobStorage.GetDataLakeServiceClient();
var dataLakeFileSystemClient = InSysDataLake.BlobStorage.GetFileSystemClient(dataLakeServiceClient, container);
await InSysDataLake.BlobStorage.UploadFileFromMemoryStreamAsync(dataLakeFileSystemClient, path, filenameWithoutExtension + ".json.gz", compressedMs);
Console.WriteLine("Data upload completed successfully.");
return "OK";
}
}
}
catch (Exception ex)
{
Trace.TraceError(Helper.GenerateErrorMessage("InSysDataProcessing.DataArchiving.ArchiveDataAsync", ex));
return $"ERROR: {ex.StackTrace}";
}
}
UploadFileFromMemoryStream 方法是:
public static async Task UploadFileFromMemoryStreamAsync(DataLakeFileSystemClient fileSystemClient, String directoryName, String filename, MemoryStream contentMS)
{
DataLakeDirectoryClient directoryClient = fileSystemClient.GetDirectoryClient(directoryName);
DataLakeFileClient fileClient = await directoryClient.CreateFileAsync(filename).ConfigureAwait(false);
contentMS.Position = 0;
var contentSize = contentMS.Length;
await fileClient.AppendAsync(contentMS, offset: 0).ConfigureAwait(false);
await fileClient.FlushAsync(position: contentSize).ConfigureAwait(false);
}
压缩后的流最终大小约为 5-20MB,未压缩的序列化数据可以大 5-10 倍(高达 250Mb)。使用这种归档方法的过程是调用它来存储每个设备的月度数据,然后它聚合特定位置的设备的所有数据并将其归档。它在 5-10 小时内被调用 5-10000 次。
时不时地(但经常成为问题)我看到以下错误:
[07/30/2020 16:43:07 > 2ef8ea: INFO] Monthly location data for ... for 3/2019 FAILED to be archived: ERROR: at System.IO.MemoryStream.set_Capacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.MemoryStream.EnsureCapacity(Int32 value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.MemoryStream.Write(Byte[] buffer, Int32 offset, Int32 count)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.StreamWriter.Flush(Boolean flushStream, Boolean flushEncoder)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at System.IO.StreamWriter.Write(Char value)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonTextWriter.WriteValueDelimiter()
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonWriter.AutoComplete(JsonToken tokenBeingWritten)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonTextWriter.WritePropertyName(String name, Boolean escape)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeList(JsonWriter writer, IEnumerable values, JsonArrayContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeObject(JsonWriter writer, Object value, JsonObjectContract contract, JsonProperty member, JsonContainerContract collectionContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.SerializeValue(JsonWriter writer, Object value, JsonContract valueContract, JsonProperty member, JsonContainerContract containerContract, JsonProperty containerProperty)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.Serialization.JsonSerializerInternalWriter.Serialize(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at Newtonsoft.Json.JsonSerializer.SerializeInternal(JsonWriter jsonWriter, Object value, Type objectType)
[07/30/2020 16:43:07 > 2ef8ea: INFO] at InSysDataProcessing.DataArchiving.ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data) in E:\InergySystems\GitHub\InSysCore\InSysDataManagement\DataArchiving.cs:line 60.
这仅发生在聚合的位置数据,即更大的数据集。看起来一段时间后内存变得过于碎片化。在调用归档功能的过程中,我添加了:
GCSettings.LargeObjectHeapCompactionMode = GCLargeObjectHeapCompactionMode.CompactOnce;
GC.Collect();
每次调用存档器后,但这并没有帮助。我还尝试扩大运行 webjob 的应用服务计划(目前是 P3v2,16 GB RAM,但这并不比 7GB RAM 的计划好)。
在这一点上,我不确定下一步该尝试什么。
解决方案
感谢上述问题评论中提供的链接,我修改了代码,现在它更简单、更健壮。这是现在使用的修正方法,没有内存错误,即使对于 >450MB 的未压缩序列化数据也是如此:
const int BufferSize = 16384;
public static async Task<String> ArchiveDataAsync(String container, String path, String filenameWithoutExtension, Object data)
{
Console.WriteLine("Attempting to serialize, compress and upload data to data lake...");
try
{
using (var ms = new MemoryStream())
{
using (var gzip = new GZipStream(ms, CompressionMode.Compress, true))
{
using (var writer = new StreamWriter(gzip, Encoding.UTF8, BufferSize))
{
var serializer = new JsonSerializer
{
NullValueHandling = NullValueHandling.Ignore,
Formatting = Formatting.None
};
serializer.Serialize(writer, data);
}
}
ms.Seek(0, SeekOrigin.Begin);
// Save data to Data Lake
Console.WriteLine($"Starting data upload to {path + "/" + filenameWithoutExtension}.json.gz...");
var dataLakeServiceClient = BlobStorage.GetDataLakeServiceClient();
var dataLakeFileSystemClient = BlobStorage.GetFileSystemClient(dataLakeServiceClient, container);
await BlobStorage.UploadFileFromMemoryStreamAsync(dataLakeFileSystemClient, path, filenameWithoutExtension + ".json.gz", ms);
Console.WriteLine("Data upload completed successfully.");
return "OK";
}
}
catch (Exception ex)
{
Trace.TraceError(Helper.GenerateErrorMessage("InSysDataProcessing.DataArchiving.Json.ArchiveDataAsync", ex));
return $"ERROR: {ex.StackTrace}";
}
}
推荐阅读
- docker - 将 Hyperledger 的 orderer.exemple.com 上传到 docker compose 时出错
- reactjs - AntD Upload在裁剪图片前需要验证
- java - 创建 bean 时出现运行时错误。java.lang.AbstractMethodError:
- ansible - 从无python远程主机获取文件
- python - 使用 if elif .. 检查今天的点和昨天的点的多个条件检查数据框行
- excel - Excel - 数据透视表 - 来自多个工作表的数据
- r - 如何解决这个 R 包 tidyverse 错误
- google-sheets - Google表格在不同表格之间复制和粘贴值以及格式
- python - AttributeError:“节点”对象没有属性“输入掩码”
- c++ - C++ 数组中字符串数据类型的大小