c# - 线程安全写入缓存到数据库的最佳模式是什么?
问题描述
我有一个可以被多个线程调用的方法,将数据写入数据库。为了减少数据库流量,我缓存数据并批量写入。
现在我想知道,有没有更好的(例如无锁模式)可以使用?
这是一个示例,我现在是如何做到的?
public class WriteToDatabase : IWriter, IDisposable
{
public WriteToDatabase(PLCProtocolServiceConfig currentConfig)
{
writeTimer = new System.Threading.Timer(Writer);
writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
this.currentConfig = currentConfig;
}
private System.Threading.Timer writeTimer;
private List<PlcProtocolDTO> writeChache = new List<PlcProtocolDTO>();
private readonly PLCProtocolServiceConfig currentConfig;
private bool disposed;
public void Write(PlcProtocolDTO row)
{
lock (this)
{
writeChache.Add(row);
}
}
private void Writer(object state)
{
List<PlcProtocolDTO> oldCachce = null;
lock (this)
{
if (writeChache.Count > 0)
{
oldCachce = writeChache;
writeChache = new List<PlcProtocolDTO>();
}
}
if (oldCachce != null)
{
using (var s = VisuDL.CreateSession())
{
s.Insert(oldCachce);
}
}
if (!this.disposed)
writeTimer.Change((int)currentConfig.WriteToDatabaseTimer.TotalMilliseconds, Timeout.Infinite);
}
public void Dispose()
{
this.disposed = true;
writeTimer.Dispose();
Writer(null);
}
}
解决方案
我可以看到基于计时器的代码存在一些问题。
- 即使在新版本的代码中,仍然有可能在重新启动或关闭时丢失写入。该
Dispose
方法不等待当前可能正在进行的最后一个计时器回调完成。由于计时器回调在线程池线程(后台线程)上运行,因此它们将在主线程退出时中止。 - 批处理的大小没有限制,当您达到底层存储 API 的限制时(例如 sql 数据库对查询长度和使用的参数数量有限制),这将被打破。
- 由于您正在执行 i/o,因此实现可能应该是异步的
- 这将在负载下表现不佳。特别是随着负载不断增加,批处理将变得更大,因此执行速度变慢,而较慢的批处理执行反过来又会给下一个额外的时间来累积项目,从而使它们变得更慢,等等......最终写入批处理将失败(如果您达到 sql 限制或查询超时)或应用程序将内存不足。要处理高负载,您实际上只有两个选择,即应用背压(即减慢生产者的速度)或放弃写入。
- 如果数据库可以处理它,您可能希望允许有限数量的并发写入者。
- 场上的比赛条件
disposed
可能会导致ObjectDisposedException
inwriteTimer.Change
。
我认为解决上述问题的更好模式是消费者-生产者模式,您可以使用 ConcurrentQueue 或新的 System.Threading.Channels api 在 .net 中实现它。
还要记住,如果您的应用程序因任何原因崩溃,您将丢失仍在缓冲的记录。
这是使用通道的示例实现:
public interface IWriter<in T>
{
ValueTask WriteAsync(IEnumerable<T> items);
}
public sealed record Options(int BatchSize, TimeSpan Interval, int MaxPendingWrites, int Concurrency);
public class BatchWriter<T> : IWriter<T>, IAsyncDisposable
{
readonly IWriter<T> writer;
readonly Options options;
readonly Channel<T> channel;
readonly Task[] consumers;
public BatchWriter(IWriter<T> writer, Options options)
{
this.writer = writer;
this.options = options;
channel = Channel.CreateBounded<T>(new BoundedChannelOptions(options.MaxPendingWrites)
{
// Choose between backpressure (Wait) or
// various ways to drop writes (DropNewest, DropOldest, DropWrite).
FullMode = BoundedChannelFullMode.Wait,
SingleWriter = false,
SingleReader = options.Concurrency == 1
});
consumers = Enumerable.Range(start: 0, options.Concurrency)
.Select(_ => Task.Run(Start))
.ToArray();
}
async Task Start()
{
var batch = new List<T>(options.BatchSize);
var timer = Task.Delay(options.Interval);
var canRead = channel.Reader.WaitToReadAsync().AsTask();
while (true)
{
if (await Task.WhenAny(timer, canRead) == timer)
{
timer = Task.Delay(options.Interval);
await Flush(batch);
}
else if (await canRead)
{
while (channel.Reader.TryRead(out var item))
{
batch.Add(item);
if (batch.Count == options.BatchSize)
{
await Flush(batch);
}
}
canRead = channel.Reader.WaitToReadAsync().AsTask();
}
else
{
await Flush(batch);
return;
}
}
async Task Flush(ICollection<T> items)
{
if (items.Count > 0)
{
await writer.WriteAsync(items);
items.Clear();
}
}
}
public async ValueTask WriteAsync(IEnumerable<T> items)
{
foreach (var item in items)
{
await channel.Writer.WriteAsync(item);
}
}
public async ValueTask DisposeAsync()
{
channel.Writer.Complete();
await Task.WhenAll(consumers);
}
}
推荐阅读
- javascript - 将 Web 应用程序部署到 Heroku 时找不到 Express 模块
- r - R中向量上标的含义
- r - 在 R 中按组汇总结果
- java - 如何从 Spring WebFlux 反应式中的 ServerRequest 对象获取请求正文?
- java - 如何将 tiff 文件格式的 Oracle Blob 转换为 jpeg 文件格式的 Blob
- json - 通过命令行更新气流中的命名环境变量
- html - Angular - 如何模拟 HttpErrorResponse?
- c++ - 计算 md5 哈希时输出混乱(新手)
- sql - 创建自定义字段 SQL 周 -- Oracle 12c
- r - Databricks 上的 R-Studio IDE