首页 > 解决方案 > 一种将传入的并行请求打包成一个的模式

问题描述

假设我们有许多随机传入的线程并行访问相同的资源。要访问资源线程需要获取锁。如果我们可以将 N 个传入线程打包到一个请求中,资源使用效率将提高 N 倍。此外,我们需要尽快回复个人请求。在 C# 中做到这一点的最佳方式/模式是什么?

目前我有这样的事情:

//batches lock
var ilock = ModifyBatch.GetTableDeleteBatchLock(table_info.Name);
lock (ilock)
{
    // put the request into requests batch
    if (!ModifyBatch._delete_batch.ContainsKey(table_info.Name))
    {
        ModifyBatch._delete_batch[table_info.Name] = new DeleteData() { Callbacks = new List<Action<string>>(), ids = ids };
    }
    else
    {
        ModifyBatch._delete_batch[table_info.Name].ids.UnionWith(ids);
    }
    //this callback will get called once the job is done by a thread that will acquire resource lock
    ModifyBatch._delete_batch[table_info.Name].Callbacks.Add(f =>
    {
        done = true;
        error = f;
    });
}

bool lockAcquired = false;
int maxWaitMs = 60000;
DeleteData _delete_data = null;

//resource lock
var _write_lock = GetTableWriteLock(typeof(T).Name);
try
{
    DateTime start = DateTime.Now;
    while (!done)
    {
        lockAcquired = Monitor.TryEnter(_write_lock, 100);
        if (lockAcquired)
        {
            if (done) //some other thread did our job
                            {
                Monitor.Exit(_write_lock);
                lockAcquired = false;
                break;
            }
            else
            {
                break;
            }
        }
        Thread.Sleep(100);
        if ((DateTime.Now - start).TotalMilliseconds > maxWaitMs)
        {
            throw new Exception("Waited too long to acquire write lock?");
        }
    }
    if (done) //some other thread did our job
    {
        if (!string.IsNullOrEmpty(error))
        {
            throw new Exception(error);
        }
        else
        {
            return;
        }
    }

    //not done, but have write lock for the table
    lock (ilock)
    {
        _delete_data = ModifyBatch._delete_batch[table_info.Name];
        var oval = new DeleteData();
        ModifyBatch._delete_batch.TryRemove(table_info.Name, out oval);
    }
    if (_delete_data.ids.Any())
    {
        //doing the work with resource 
    }
    foreach (var cb in _delete_data.Callbacks)
    {
        cb(null);
    }
}
catch (Exception ex)
{
    if (_delete_data != null)
    {
        foreach (var cb in _delete_data.Callbacks)
        {
            cb(ex.Message);
        }
    }
    throw;
}
finally
{
    if (lockAcquired)
    {
        Monitor.Exit(_write_lock);
    }
}

标签: c#performanceoptimizationdesign-patternsparallel-processing

解决方案


如果可以处理当前请求范围之外的任务,即将它排队等待以后,那么你可以考虑这样的序列1

实现一个资源(监视器)和一个List任务。

  1. 对于每个请求:

  2. 锁定列表,将当前任务添加到列表中,记住 nr。列表中的任务,解锁列表。

  3. 尝试获取

  4. 如果不成功:

    • 如果nr。列表中的任务数 < 阈值 X,然后返回。
    • 否则获取锁(将阻止)
  5. 锁定列表,将其内容移动到临时列表,解锁列表。

  6. 如果临时列表不为空

    • 执行临时列表中的任务。

    • 从第 5 步开始重复。

  7. 开锁

第一个请求将通过整个序列。后续请求(如果第一个请求仍在执行)将在第 4 步短路。

调整最佳阈值 X(或将其更改为基于时间的阈值)。


1如果需要等待请求范围内的任务,那么需要稍微扩展一下流程:

向 Task 类添加两个字段:完成标志异常

在第 4 步,在 Returning 之前,等待任务完成 ( Monitor.Wait) 直到其完成标志变为true。如果异常不是null,抛出它。

在第 6 步,为每个任务设置完成标志和可选的异常,然后通知服务员 ( Monitor.PulseAll)。


推荐阅读