c# - 一种将传入的并行请求打包成一个的模式
问题描述
假设我们有许多随机传入的线程并行访问相同的资源。要访问资源线程需要获取锁。如果我们可以将 N 个传入线程打包到一个请求中,资源使用效率将提高 N 倍。此外,我们需要尽快回复个人请求。在 C# 中做到这一点的最佳方式/模式是什么?
目前我有这样的事情:
//batches lock
var ilock = ModifyBatch.GetTableDeleteBatchLock(table_info.Name);
lock (ilock)
{
// put the request into requests batch
if (!ModifyBatch._delete_batch.ContainsKey(table_info.Name))
{
ModifyBatch._delete_batch[table_info.Name] = new DeleteData() { Callbacks = new List<Action<string>>(), ids = ids };
}
else
{
ModifyBatch._delete_batch[table_info.Name].ids.UnionWith(ids);
}
//this callback will get called once the job is done by a thread that will acquire resource lock
ModifyBatch._delete_batch[table_info.Name].Callbacks.Add(f =>
{
done = true;
error = f;
});
}
bool lockAcquired = false;
int maxWaitMs = 60000;
DeleteData _delete_data = null;
//resource lock
var _write_lock = GetTableWriteLock(typeof(T).Name);
try
{
DateTime start = DateTime.Now;
while (!done)
{
lockAcquired = Monitor.TryEnter(_write_lock, 100);
if (lockAcquired)
{
if (done) //some other thread did our job
{
Monitor.Exit(_write_lock);
lockAcquired = false;
break;
}
else
{
break;
}
}
Thread.Sleep(100);
if ((DateTime.Now - start).TotalMilliseconds > maxWaitMs)
{
throw new Exception("Waited too long to acquire write lock?");
}
}
if (done) //some other thread did our job
{
if (!string.IsNullOrEmpty(error))
{
throw new Exception(error);
}
else
{
return;
}
}
//not done, but have write lock for the table
lock (ilock)
{
_delete_data = ModifyBatch._delete_batch[table_info.Name];
var oval = new DeleteData();
ModifyBatch._delete_batch.TryRemove(table_info.Name, out oval);
}
if (_delete_data.ids.Any())
{
//doing the work with resource
}
foreach (var cb in _delete_data.Callbacks)
{
cb(null);
}
}
catch (Exception ex)
{
if (_delete_data != null)
{
foreach (var cb in _delete_data.Callbacks)
{
cb(ex.Message);
}
}
throw;
}
finally
{
if (lockAcquired)
{
Monitor.Exit(_write_lock);
}
}
解决方案
如果可以处理当前请求范围之外的任务,即将它排队等待以后,那么你可以考虑这样的序列1:
实现一个资源锁(监视器)和一个List
任务。
对于每个请求:
锁定列表,将当前任务添加到列表中,记住 nr。列表中的任务,解锁列表。
尝试获取锁。
如果不成功:
- 如果nr。列表中的任务数 < 阈值 X,然后返回。
- 否则获取锁(将阻止)
锁定列表,将其内容移动到临时列表,解锁列表。
如果临时列表不为空
执行临时列表中的任务。
从第 5 步开始重复。
松开锁。
第一个请求将通过整个序列。后续请求(如果第一个请求仍在执行)将在第 4 步短路。
调整最佳阈值 X(或将其更改为基于时间的阈值)。
1如果需要等待请求范围内的任务,那么需要稍微扩展一下流程:
向 Task 类添加两个字段:完成标志和异常。
在第 4 步,在 Returning 之前,等待任务完成 ( Monitor.Wait
) 直到其完成标志变为true
。如果异常不是null
,抛出它。
在第 6 步,为每个任务设置完成标志和可选的异常,然后通知服务员 ( Monitor.PulseAll
)。
推荐阅读
- flowtype - 如何在 React HOC 中描述流移除道具
- javascript - js 事件,如果用户点击浏览器的上下文菜单
- circuit-sdk - 电路 API 问题
- callback - RabbitMQ 是否为消费者保持开放连接?
- javascript - 当我将目的地设置为另存为 PDF 时,window.print() 不显示更多设置
- swift - 在模拟器上运行时出现链接器错误,可以在设备上运行,有人可以帮我解决这个问题吗?
- linux - 线程级并行 VS 进程级并行
- android - 隐形视图的触控能力
- javascript - 如何设计前端架构来处理 aem 中的多个品牌?
- java - Spring Boot 休息控制器未映射