首页 > 解决方案 > 线程安全写入缓存到数据库的最佳模式是什么?

问题描述

我有一个可以被多个线程调用的方法,将数据写入数据库。为了减少数据库流量,我缓存数据并批量写入。

现在我想知道,有没有更好的(例如无锁模式)可以使用?

这是一个示例,我现在是如何做到的?

    public class WriteToDatabase : IWriter, IDisposable
    {
        public WriteToDatabase(PLCProtocolServiceConfig currentConfig)
        {
            writeTimer = new System.Threading.Timer(Writer);
            writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
            this.currentConfig = currentConfig;
        }

        private System.Threading.Timer writeTimer;
        private List<PlcProtocolDTO> writeChache = new List<PlcProtocolDTO>();
        private readonly PLCProtocolServiceConfig currentConfig;
        private bool disposed;

        public void Write(PlcProtocolDTO row)
        {
            lock (this)
            {
                writeChache.Add(row);
            }
        }

        private void Writer(object state)
        {
            List<PlcProtocolDTO> oldCachce = null;
            lock (this)
            {
                if (writeChache.Count > 0)
                {
                    oldCachce = writeChache;
                    writeChache = new List<PlcProtocolDTO>();
                }
            }

            if (oldCachce != null)
            {
                    using (var s = VisuDL.CreateSession())
                    {
                        s.Insert(oldCachce);
                    }
            }

            if (!this.disposed)
                writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
        }

        public void Dispose()
        {
            this.disposed = true;
            writeTimer.Dispose();
            Writer(null);
        }
    }

标签: c#multithreadingcachingdesign-patterns

解决方案


我可以看到基于计时器的代码存在一些问题。

  • 即使在新版本的代码中,仍然有可能在重新启动或关闭时丢失写入。该Dispose方法不等待当前可能正在进行的最后一个计时器回调完成。由于计时器回调在线程池线程(后台线程)上运行,因此它们将在主线程退出时中止。
  • 批处理的大小没有限制,当您达到底层存储 API 的限制时(例如 sql 数据库对查询长度和使用的参数数量有限制),这将被打破。
  • 由于您正在执行 i/o,因此实现可能应该是异步的
  • 这将在负载下表现不佳。特别是随着负载不断增加,批处理将变得更大,因此执行速度变慢,而较慢的批处理执行反过来又会给下一个额外的时间来累积项目,从而使它们变得更慢,等等......最终写入批处理将失败(如果您达到 sql 限制或查询超时)或应用程序将内存不足。要处理高负载,您实际上只有两个选择,即应用背压(即减慢生产者的速度)或放弃写入。
  • 如果数据库可以处理它,您可能希望允许有限数量的并发写入者。
  • 场上的比赛条件disposed可能会导致ObjectDisposedExceptionin writeTimer.Change

我认为解决上述问题的更好模式是消费者-生产者模式,您可以使用 ConcurrentQueue 或新的 System.Threading.Channels api 在 .net 中实现它。

还要记住,如果您的应用程序因任何原因崩溃,您将丢失仍在缓冲的记录。

这是使用通道的示例实现:

public interface IWriter<in T>
{
    ValueTask WriteAsync(IEnumerable<T> items);
}

public sealed record Options(int BatchSize, TimeSpan Interval, int MaxPendingWrites, int Concurrency);

public class BatchWriter<T> : IWriter<T>, IAsyncDisposable
{
    readonly IWriter<T> writer;
    readonly Options options;
    readonly Channel<T> channel;
    readonly Task[] consumers;

    public BatchWriter(IWriter<T> writer, Options options)
    {
        this.writer = writer;
        this.options = options;

        channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.MaxPendingWrites)
        {
            // Choose between backpressure (Wait) or
            // various ways to drop writes (DropNewest, DropOldest, DropWrite).
            FullMode = BoundedChannelFullMode.Wait,

            SingleWriter = false,
            SingleReader = options.Concurrency == 1
        });

        consumers = Enumerable.Range(start: 0, options.Concurrency)
            .Select(_ => Task.Run(Start))
            .ToArray();
    }

    async Task Start()
    {
        var batch = new List<T>(options.BatchSize);

        var timer = Task.Delay(options.Interval);
        var canRead = channel.Reader.WaitToReadAsync().AsTask();

        while (true)
        {
            if (await Task.WhenAny(timer, canRead) == timer)
            {
                timer = Task.Delay(options.Interval);
                await Flush(batch);
            }
            else if (await canRead)
            {
                while (channel.Reader.TryRead(out var item))
                {
                    batch.Add(item);

                    if (batch.Count == options.BatchSize)
                    {
                        await Flush(batch);
                    }
                }

                canRead = channel.Reader.WaitToReadAsync().AsTask();
            }
            else
            {
                await Flush(batch);
                return;
            }
        }

        async Task Flush(ICollection<T> items)
        {
            if (items.Count > 0)
            {
                await writer.WriteAsync(items);
                items.Clear();
            }
        }
    }

    public async ValueTask WriteAsync(IEnumerable<T> items)
    {
        foreach (var item in items)
        {
            await channel.Writer.WriteAsync(item);
        }
    }

    public async ValueTask DisposeAsync()
    {
        channel.Writer.Complete();
        await Task.WhenAll(consumers);
    }
}

推荐阅读