c# - 信号量虽然未满但仍会阻塞
问题描述
我目前正在尝试优化一个旧的且编写得很糟糕的类,它处理大量数据,因此很容易花费几个小时来运行一组数据。收集数据已经花费了很多时间,这就是我在这里尝试改进的地方。我知道这是相当臭的代码,但这只是一个测试,如果这甚至可以改进任何东西,所以请专注于这个问题:
我尝试SemaphoreSlim
并Semaphore
为了减少同时运行的任务数量。我的数据集将生成大约 70 个任务,这可能会导致线程不足和整体性能下降。至少事实证明它变得不那么敏感了。因此,我尝试将其同时保持在 5 个任务上,以获得更好的整体吞吐量。
现在,当我尝试等待我的任务进入信号量时,它会阻塞(使用 await 的苗条信号量也会阻塞),但即使信号量未满,它也不会进入。此代码位于异步方法中,作为轻微的上下文提示。
Semaphore throttle = new Semaphore(0, 5);
try
{
foreach (var folder in folders)
{
// Wait in case there are already 5 tasks running to reduce thread starvation
collectionTasks.Add(Task.Run( () =>
{
// ReSharper disable once AccessToDisposedClosure
throttle.WaitOne();
return GetGapProfiles(folder.Value, progress, token);
}, token).ContinueWith(
t =>
{
// ReSharper disable once AccessToDisposedClosure
throttle.Release();
return t.Result;
}, TaskContinuationOptions.None));
}
// When all are loaded concat all results into one collection
await Task.WhenAll(collectionTasks);
}
catch (Exception ex)
{
Log.Error(ex, "Failed to collect profiles.");
}
finally
{
throttle.Dispose();
}
我只是不明白为什么这会阻塞并且永远不会进入GetGapProfiles
。谁能解释一下?
解决方案
public static class perTaskThrottle
{
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
this IEnumerable<TInput> sourceItems,
Func<TInput, Task<TResult>> func,
int concurrentTasks = 1)
{
return ForEachAsyncThrottled(sourceItems, func, CancellationToken.None, concurrentTasks);
}
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="token"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static async Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
this IEnumerable<TInput> sourceItems,
Func<TInput, Task<TResult>> func,
CancellationToken token,
int concurrentTasks = 1)
{
var result = new ConcurrentDictionary<TInput, TResult>();
var tasksList = new List<Task>();
using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
{
foreach (var item in sourceItems)
{
token.ThrowIfCancellationRequested();
// if there are already concurrentTasks tasks executing, pause until one has completed ( semaphoreSlim.Release() )
await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever, token).ConfigureAwait(false);
token.ThrowIfCancellationRequested();
Action<Task<TResult>> okContinuation = async task =>
{
// the task has already completed if status is CompletedOk, but using await once more is safer than using task.Result
var taskResult = await task;
result[item] = taskResult;
};
// ReSharper disable once AccessToDisposedClosure
Action<Task> allContinuation = task => semaphoreSlim.Release();
tasksList.Add(func.Invoke(item)
.ContinueWith(okContinuation, TaskContinuationOptions.OnlyOnRanToCompletion)
.ContinueWith(allContinuation, token));
token.ThrowIfCancellationRequested();
}
if (!token.IsCancellationRequested)
{
await Task.WhenAll(tasksList).ConfigureAwait(false);
}
}
return result;
}
}
所以在你的情况下你可以使用
var results = folders.ForEachAsyncThrottled( (f) => GetGapProfiles(f.Value), token, 5);
推荐阅读
- php - 如何在PHP中将字符串中每个单词的最后一个字母大写?
- scala - 如何根据另一列的值从 Spark DataFrame 中选择特定列?
- ruby-on-rails - 有没有办法更新次要属性?
- firebase - 如果我使用电话号码登录用户而不在 Firebase 中创建用户,会发生什么?
- javascript - 如何通过对象传递道具
- python - 如何将标记的 x 值添加到此系列图?
- django - Django如何将评级显示为星星?
- javascript - Reactjs 组件不更新
- vb.net - 将 DGV 表传递给 VB.Net 中的报表查看器
- laravel - 为什么 Laravel Vapor 不在我的构建步骤中注入 ASSET_URL?