首页 > 解决方案 > C# - OutOfMemoryException 在 JSON 文件中保存列表

问题描述

我正在尝试保存压力图的流数据。基本上我有一个压力矩阵定义为:

double[,] pressureMatrix = new double[e.Data.GetLength(0), e.Data.GetLength(1)];

基本上,我pressureMatrix每 10 毫秒就会得到一个,我想将所有信息保存在 JSON 文件中,以便以后能够重现它。

我要做的是,首先,用所有用于记录的设置写我称之为标题的内容,如下所示:

recordedData.softwareVersion = Assembly.GetExecutingAssembly().GetName().Version.Major.ToString() + "." + Assembly.GetExecutingAssembly().GetName().Version.Minor.ToString();
recordedData.calibrationConfiguration = calibrationConfiguration;
recordedData.representationConfiguration = representationSettings;
recordedData.pressureData = new List<PressureMap>();

var json = JsonConvert.SerializeObject(csvRecordedData, Formatting.None);

File.WriteAllText(this.filePath, json);

然后,每次我得到一个新的压力图时,我都会创建一个新的线程来添加新的PressureMatrix并重写文件:

var newPressureMatrix = new PressureMap(datos, DateTime.Now);
recordedData.pressureData.Add(newPressureMatrix);
var json = JsonConvert.SerializeObject(recordedData, Formatting.None);
File.WriteAllText(this.filePath, json);

大约 20-30 分钟后,我得到一个 OutOfMemory 异常,因为系统无法保存记录的数据var,因为List<PressureMatrix>它太大了。

我该如何处理以保存数据?我想保存24-48小时的信息。

标签: c#.netjsonmultithreadingjsonconvert

解决方案


您的基本问题是您将所有压力图样本保存在内存中,而不是单独编写每个样本,然后允许对其进行垃圾收集。更糟糕的是,您在两个不同的地方执行此操作:

  1. json在将字符串写入文件之前 ,您将整个样本列表序列化为 JSON 字符串。

    相反,正如性能提示:优化内存使用中所述,在这种情况下,您应该直接对文件进行序列化和反序列化。有关如何执行此操作的说明,请参见Can Json.NET serialize / deserialize to / from a stream? 的答案?并将JSON 序列化为文件

  2. recordedData.pressureData = new List<PressureMap>();累积所有压力图样本,然后在每次制作样本时将它们全部写入。

    更好的解决方案是编写每个样本一次并忘记它,但是每个样本都嵌套在 JSON 中的一些容器对象中的要求使得如何做到这一点并不明显。

那么,如何解决问题#2?

首先,让我们如下修改您的数据模型,将标头数据划分为一个单独的类:

public class PressureMap
{
    public double[,] PressureMatrix { get; set; }
}

public class CalibrationConfiguration 
{
    // Data model not included in question
}

public class RepresentationConfiguration 
{
    // Data model not included in question
}

public class RecordedDataHeader
{
    public string SoftwareVersion { get; set; }
    public CalibrationConfiguration CalibrationConfiguration { get; set; }
    public RepresentationConfiguration RepresentationConfiguration { get; set; }
}

public class RecordedData
{
    // Ensure the header is serialized first.
    [JsonProperty(Order = 1)]
    public RecordedDataHeader RecordedDataHeader { get; set; }
    // Ensure the pressure data is serialized last.
    [JsonProperty(Order = 2)]
    public IEnumerable<PressureMap> PressureData { get; set; }
}

选项 #1生产者-消费者模式的一个版本。它涉及启动两个线程:一个用于生成PressureData样本,一个用于序列化RecordedData. 第一个线程将生成样本并将它们添加到BlockingCollection<PressureMap>传递给第二个线程的集合中。然后第二个线程将序列BlockingCollection<PressureMap>.GetConsumingEnumerable() 化为 的值RecordedData.PressureData

以下代码给出了如何执行此操作的框架:

var sampleCount = 400;    // Or whatever stopping criterion you prefer
var sampleInterval = 10;  // in ms

using (var pressureData = new BlockingCollection<PressureMap>())
{
    // Adapted from
    // https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/blockingcollection-overview
    // https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netframework-4.7.2

    // Spin up a Task to sample the pressure maps
    using (Task t1 = Task.Factory.StartNew(() =>
    {
        for (int i = 0; i < sampleCount; i++)
        {
            var data = GetPressureMap(i);
            Console.WriteLine("Generated sample {0}", i);
            pressureData.Add(data);
            System.Threading.Thread.Sleep(sampleInterval);
        }
        pressureData.CompleteAdding();
    }))
    {
        // Spin up a Task to consume the BlockingCollection
        using (Task t2 = Task.Factory.StartNew(() =>
        {
            var recordedDataHeader = new RecordedDataHeader
            {
                SoftwareVersion = softwareVersion,
                CalibrationConfiguration = calibrationConfiguration,
                RepresentationConfiguration = representationConfiguration,
            };

            var settings = new JsonSerializerSettings
            {
                ContractResolver = new CamelCasePropertyNamesContractResolver(),
            };

            using (var stream = new FileStream(this.filePath, FileMode.Create))
            using (var textWriter = new StreamWriter(stream))
            using (var jsonWriter = new JsonTextWriter(textWriter))
            {
                int j = 0;

                var query = pressureData
                    .GetConsumingEnumerable()
                    .Select(p => 
                            { 
                                // Flush the writer periodically in case the process terminates abnormally
                                jsonWriter.Flush();
                                Console.WriteLine("Serializing item {0}", j++);
                                return p;
                            });

                var recordedData = new RecordedData
                {
                    RecordedDataHeader = recordedDataHeader,
                    // Since PressureData is declared as IEnumerable<PressureMap>, evaluation will be lazy.
                    PressureData = query,
                };                          

                Console.WriteLine("Beginning serialization of {0} to {1}:", recordedData, this.filePath);
                JsonSerializer.CreateDefault(settings).Serialize(textWriter, recordedData);
                Console.WriteLine("Finished serialization of {0} to {1}.", recordedData, this.filePath);
            }
        }))
        {
            Task.WaitAll(t1, t2);
        }
    }
}

笔记:

  • 此解决方案使用的事实是,在序列化.NET 时IEnumerable<T>,Json.NET不会将枚举物化为列表。相反,它将充分利用惰性评估并简单地枚举它,写入然后忘记遇到的每个单独的项目。

  • 第一个线程采样PressureData并将它们添加到阻塞集合中。

  • 第二个线程将阻塞集合包装在 anIEnumerable<PressureData>然后将其序列化为RecordedData.PressureData.

    在序列化过程中,序列化程序将通过枚举进行IEnumerable<PressureData>枚举,将每个流式传输到 JSON 文件,然后继续执行下一个 - 有效地阻塞直到一个可用。

  • 您将需要做一些实验以确保序列化线程可以“跟上”采样线程,可能通过BoundedCapacity在构造期间设置 a。如果没有,您可能需要采用不同的策略。

  • PressureMap GetPressureMap(int count)应该是您的某种方法(问题中未显示),它返回当前压力图样本。

  • 在这种技术中,JSON 文件在采样会话期间保持打开状态。如果采样异常终止,文件可能会被截断。我尝试通过定期刷新写入器来改善问题。

  • 虽然数据序列化将不再需要无限量的内存,但反序列化RecordedData稍后会将PressureData数组反序列化为具体的List<PressureMap>. 这可能会导致下游处理期间出现内存问题。

演示小提琴#1在这里

选项 #2是从 JSON 文件切换到换行符分隔的 JSON文件。这样的文件由由换行符分隔的 JSON 对象序列组成。在您的情况下,您将使第一个对象包含RecordedDataHeader信息,而后续对象的类型为PressureMap

var sampleCount = 100; // Or whatever
var sampleInterval = 10;

var recordedDataHeader = new RecordedDataHeader
{
    SoftwareVersion = softwareVersion,
    CalibrationConfiguration = calibrationConfiguration,
    RepresentationConfiguration = representationConfiguration,
};

var settings = new JsonSerializerSettings
{
    ContractResolver = new CamelCasePropertyNamesContractResolver(),
};

// Write the header
Console.WriteLine("Beginning serialization of sample data to {0}.", this.filePath);

using (var stream = new FileStream(this.filePath, FileMode.Create))
{
    JsonExtensions.ToNewlineDelimitedJson(stream, new[] { recordedDataHeader });
}

// Write each sample incrementally

for (int i = 0; i < sampleCount; i++)
{
    Thread.Sleep(sampleInterval);
    Console.WriteLine("Performing sample {0} of {1}", i, sampleCount);
    var map = GetPressureMap(i);

    using (var stream = new FileStream(this.filePath, FileMode.Append))
    {
        JsonExtensions.ToNewlineDelimitedJson(stream, new[] { map });
    }
}

Console.WriteLine("Finished serialization of sample data to {0}.", this.filePath);

使用扩展方法:

public static partial class JsonExtensions
{
    // Adapted from the answer to
    // https://stackoverflow.com/questions/44787652/serialize-as-ndjson-using-json-net
    // by dbc https://stackoverflow.com/users/3744182/dbc
    public static void ToNewlineDelimitedJson<T>(Stream stream, IEnumerable<T> items)
    {
        // Let caller dispose the underlying stream 
        using (var textWriter = new StreamWriter(stream, new UTF8Encoding(false, true), 1024, true))
        {
            ToNewlineDelimitedJson(textWriter, items);
        }
    }

    public static void ToNewlineDelimitedJson<T>(TextWriter textWriter, IEnumerable<T> items)
    {
        var serializer = JsonSerializer.CreateDefault();

        foreach (var item in items)
        {
            // Formatting.None is the default; I set it here for clarity.
            using (var writer = new JsonTextWriter(textWriter) { Formatting = Formatting.None, CloseOutput = false })
            {
                serializer.Serialize(writer, item);
            }
            // http://specs.okfnlabs.org/ndjson/
            // Each JSON text MUST conform to the [RFC7159] standard and MUST be written to the stream followed by the newline character \n (0x0A). 
            // The newline charater MAY be preceeded by a carriage return \r (0x0D). The JSON texts MUST NOT contain newlines or carriage returns.
            textWriter.Write("\n");
        }
    }

    // Adapted from the answer to 
    // https://stackoverflow.com/questions/29729063/line-delimited-json-serializing-and-de-serializing
    // by Yuval Itzchakov https://stackoverflow.com/users/1870803/yuval-itzchakov
    public static IEnumerable<TBase> FromNewlineDelimitedJson<TBase, THeader, TRow>(TextReader reader)
        where THeader : TBase
        where TRow : TBase
    {
        bool first = true;

        using (var jsonReader = new JsonTextReader(reader) { CloseInput = false, SupportMultipleContent = true })
        {
            var serializer = JsonSerializer.CreateDefault();

            while (jsonReader.Read())
            {
                if (jsonReader.TokenType == JsonToken.Comment)
                    continue;
                if (first)
                {
                    yield return serializer.Deserialize<THeader>(jsonReader);
                    first = false;
                }
                else
                {
                    yield return serializer.Deserialize<TRow>(jsonReader);
                }
            }
        }
    }
}

稍后,您可以按如下方式处理换行符分隔的 JSON 文件:

using (var stream = File.OpenRead(filePath))
using (var textReader = new StreamReader(stream))
{
    foreach (var obj in JsonExtensions.FromNewlineDelimitedJson<object, RecordedDataHeader, PressureMap>(textReader))
    {
        if (obj is RecordedDataHeader)
        {
            var header = (RecordedDataHeader)obj;
            // Process the header
            Console.WriteLine(JsonConvert.SerializeObject(header));
        }
        else
        {
            var row = (PressureMap)obj;
            // Process the row.
            Console.WriteLine(JsonConvert.SerializeObject(row));
        }
    }
}

笔记:

  • 这种方法看起来更简单,因为样本是以增量方式添加到文件末尾的,而不是插入到某个整体 JSON 容器中。

  • 使用这种方法,序列化和下游处理都可以通过有限的内存使用来完成。

  • 样本文件在采样期间不会保持打开状态,因此不太可能被截断。

  • 下游应用程序可能没有用于处理换行分隔 JSON 的内置工具。

  • 这种策略可以更简单地与您当前的线程代码集成。

演示小提琴#2在这里


推荐阅读