c# - N 个线程中有 1 个从未加入
问题描述
state == Running
我有线程池实现,每当我尝试停止/加入池时,池中总会有一个随机线程在我调用池时不会停止( ) Stop()
。
我不明白为什么,我只有一把锁,我通知任何可能被阻止等待的Dequeue
人Monitor.PulseAll
in Stop
。调试器清楚地表明他们中的大多数人都收到了消息,它总是 N 中的 1 个仍在运行......
这是池的最小实现
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace MultiThreading
{
public class WorkerHub
{
private readonly object _listMutex = new object();
private readonly Queue<TaskWrapper> _taskQueue;
private readonly List<Thread> _threads;
private int _runCondition;
private readonly Dictionary<string, int> _statistics;
public WorkerHub(int count = 4)
{
_statistics = new Dictionary<string, int>();
_taskQueue = new Queue<TaskWrapper>();
_threads = new List<Thread>();
InitializeThreads(count);
}
private bool ShouldRun
{
get => Interlocked.CompareExchange(ref _runCondition, 1, 1) == 1;
set
{
if (value)
Interlocked.CompareExchange(ref _runCondition, 1, 0);
else
Interlocked.CompareExchange(ref _runCondition, 0, 1);
}
}
private void InitializeThreads(int count)
{
Action threadHandler = () =>
{
while (ShouldRun)
{
var wrapper = Dequeue();
if (wrapper != null)
{
wrapper.FunctionBinding.Invoke();
_statistics[Thread.CurrentThread.Name] += 1;
}
}
};
for (var i = 0; i < count; ++i)
{
var t = new Thread(() => { threadHandler.Invoke(); });
t.Name = $"WorkerHub Thread#{i}";
_statistics[t.Name] = 0;
_threads.Add(t);
}
}
public Task Enqueue(Action work)
{
var tcs = new TaskCompletionSource<bool>();
var wrapper = new TaskWrapper();
Action workInvoker = () =>
{
try
{
work.Invoke();
tcs.TrySetResult(true);
}
catch (Exception e)
{
tcs.TrySetException(e);
}
};
Action workCanceler = () => { tcs.TrySetCanceled(); };
wrapper.FunctionBinding = workInvoker;
wrapper.CancelBinding = workCanceler;
lock (_taskQueue)
{
_taskQueue.Enqueue(wrapper);
Monitor.PulseAll(_taskQueue);
}
return tcs.Task;
}
private TaskWrapper Dequeue()
{
lock (_listMutex)
{
while (_taskQueue.Count == 0)
{
if (!ShouldRun)
return null;
Monitor.Wait(_listMutex);
}
_taskQueue.TryDequeue(out var wrapper);
return wrapper;
}
}
public void Stop()
{
ShouldRun = false;
//Wake up whoever is waiting for dequeue
lock (_listMutex)
{
Monitor.PulseAll(_listMutex);
}
foreach (var thread in _threads)
{
thread.Join();
}
var sum = _statistics.Sum(pair => pair.Value) * 1.0;
foreach (var stat in _statistics)
{
Console.WriteLine($"{stat.Key} ran {stat.Value} functions, {stat.Value/sum * 100} percent of the total.");
}
}
public void Start()
{
ShouldRun = true;
foreach (var thread in _threads) thread.Start();
}
}
}
通过试运行
public static async Task Main(string[] args)
{
var hub = new WorkerHub();
var tasks = Enumerable.Range(0, (int) 100).Select(x => hub.Enqueue(() => Sum(x)))
.ToArray();
var sw = new Stopwatch();
sw.Start();
hub.Start();
await Task.WhenAll(tasks);
hub.Stop();
sw.Start();
Console.WriteLine($"Work took: {sw.ElapsedMilliseconds}ms.");
}
public static int Sum(int n)
{
var sum = 0;
for (var i = 0; i <= n; ++i) sum += i;
Console.WriteLine($"Sum of numbers up to {n} is {sum}");
return sum;
}
我错过了一些基本的东西吗?请注意,这不是生产代码(phew),而是我缺少的东西,所以您可能会发现不止 1 个问题 :)
解决方案
起初我无法重现您的 MCVE,因为我以非异步方式运行它Main()
...
如果您在调用时查看“线程”调试窗口,hub.Stop();
您应该会看到执行已切换到您的工作线程之一。这就是为什么一个工作线程没有响应的原因。
我认为它与这里描述的问题有关。
切换Enqueue(Action work)
到使用TaskCreationOptions.RunContinuationsAsynchronously
应该可以解决它:
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
[编辑]
避免该问题的更好方法可能是将直接线程管理换成使用任务(这不是您当前代码的适当替代品,只是想展示这个想法):
public class TaskWorkerHub
{
ConcurrentQueue<Action> workQueue = new ConcurrentQueue<Action>();
int concurrentTasks;
CancellationTokenSource cancelSource;
List<Task> workers = new List<Task>();
private async Task Worker(CancellationToken cancelToken)
{
while (workQueue.TryDequeue(out var workTuple))
{
await Task.Run(workTuple, cancelToken);
}
}
public TaskWorkerHub(int concurrentTasks = 4)
{
this.concurrentTasks = concurrentTasks;
}
public void Enqueue(Action work) => workQueue.Enqueue(work);
public void Start()
{
cancelSource = new CancellationTokenSource();
for (int i = 0; i < concurrentTasks; i++)
{
workers.Add(Worker(cancelSource.Token));
}
}
public void Stop() => cancelSource.Cancel();
public Task WaitAsync() => Task.WhenAll(workers);
}
推荐阅读
- arrays - Google 表格 - 对列中的每一行执行 VLOOKUP 并返回总和
- python - Mapbox Choropleth 地图未显示
- reactjs - ReactJS 前端与 Azure AD B2C 错误 UnsupportedAuthorityValidation
- python - 尝试使用请求打包 python 3.7 脚本时出现 SSL 模块错误
- python - pandas:groupby 2列,保持所有行具有唯一值
- sql - 过滤掉雪花中的空字符串
- python - 如何在 django 的 urls.py 中编写 url?
- python - 有没有办法在异步 python 中等待值
- python - 根据 str.split 的多个部分分配列名
- elasticsearch - Elasticsearch - 不支持小于 -1 字节的值查询节点