首页 > 解决方案 > N 个线程中有 1 个从未加入

问题描述

state == Running我有线程池实现,每当我尝试停止/加入池时,池中总会有一个随机线程在我调用池时不会停止( ) Stop()

我不明白为什么,我只有一把锁,我通知任何可能被阻止等待的DequeueMonitor.PulseAllin Stop。调试器清楚地表明他们中的大多数人都收到了消息,它总是 N 中的 1 个仍在运行......

这是池的最小实现

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace MultiThreading
{
    public class WorkerHub
    {
        private readonly object _listMutex = new object();
        private readonly Queue<TaskWrapper> _taskQueue;
        private readonly List<Thread> _threads;
        private int _runCondition;
        private readonly Dictionary<string, int> _statistics;

        public WorkerHub(int count = 4)
        {
            _statistics = new Dictionary<string, int>();
            _taskQueue = new Queue<TaskWrapper>();
            _threads = new List<Thread>();
            InitializeThreads(count);
        }

        private bool ShouldRun
        {
            get => Interlocked.CompareExchange(ref _runCondition, 1, 1) == 1;
            set
            {
                if (value)
                    Interlocked.CompareExchange(ref _runCondition, 1, 0);
                else
                    Interlocked.CompareExchange(ref _runCondition, 0, 1);
            }
        }

        private void InitializeThreads(int count)
        {
            Action threadHandler = () =>
            {
                while (ShouldRun)
                {
                    var wrapper = Dequeue();
                    if (wrapper != null)
                    {
                        wrapper.FunctionBinding.Invoke();
                        _statistics[Thread.CurrentThread.Name] += 1;
                    }
                }
            };

            for (var i = 0; i < count; ++i)
            {
                var t = new Thread(() => { threadHandler.Invoke(); });
                t.Name = $"WorkerHub Thread#{i}";
                _statistics[t.Name] = 0;
                _threads.Add(t);
            }
        }


        public Task Enqueue(Action work)
        {
            var tcs = new TaskCompletionSource<bool>();
            var wrapper = new TaskWrapper();

            Action workInvoker = () =>
            {
                try
                {
                    work.Invoke();
                    tcs.TrySetResult(true);
                }
                catch (Exception e)
                {
                    tcs.TrySetException(e);
                }
            };
            Action workCanceler = () => { tcs.TrySetCanceled(); };
            wrapper.FunctionBinding = workInvoker;
            wrapper.CancelBinding = workCanceler;


            lock (_taskQueue)
            {
                _taskQueue.Enqueue(wrapper);
                Monitor.PulseAll(_taskQueue);
            }


            return tcs.Task;
        }

        private TaskWrapper Dequeue()
        {
            lock (_listMutex)
            {
                while (_taskQueue.Count == 0)
                {
                    if (!ShouldRun)
                        return null;
                    Monitor.Wait(_listMutex);
                }

                _taskQueue.TryDequeue(out var wrapper);
                return wrapper;
            }
        }

        public void Stop()
        {
            ShouldRun = false;

            //Wake up whoever is waiting for dequeue
            lock (_listMutex)
            {
                Monitor.PulseAll(_listMutex);
            }

            foreach (var thread in _threads)
            {
                thread.Join();
            }
            var sum = _statistics.Sum(pair => pair.Value) * 1.0;
            foreach (var stat in _statistics)
            {
                Console.WriteLine($"{stat.Key} ran {stat.Value} functions, {stat.Value/sum * 100} percent of the total.");
            }
        }

        public void Start()
        {
            ShouldRun = true;
            foreach (var thread in _threads) thread.Start();
        }
    }
}

通过试运行

public static async Task Main(string[] args)
    {
        var hub = new WorkerHub();
        var tasks = Enumerable.Range(0, (int) 100).Select(x => hub.Enqueue(() => Sum(x)))
            .ToArray();
        var sw = new Stopwatch();
        sw.Start();
        hub.Start();
        await Task.WhenAll(tasks);
        hub.Stop();
        sw.Start();
        Console.WriteLine($"Work took: {sw.ElapsedMilliseconds}ms.");
    }

    public static int Sum(int n)
    {
        var sum = 0;
        for (var i = 0; i <= n; ++i) sum += i;
        Console.WriteLine($"Sum of numbers up to {n} is {sum}");
        return sum;
    }

我错过了一些基本的东西吗?请注意,这不是生产代码(phew),而是我缺少的东西,所以您可能会发现不止 1 个问题 :)

标签: c#multithreading

解决方案


起初我无法重现您的 MCVE,因为我以非异步方式运行它Main()...

如果您在调用时查看“线程”调试窗口,hub.Stop();您应该会看到执行已切换到您的工作线程之一。这就是为什么一个工作线程没有响应的原因。

我认为它与这里描述的问题有关。

切换Enqueue(Action work)到使用TaskCreationOptions.RunContinuationsAsynchronously应该可以解决它:

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);

[编辑]

避免该问题的更好方法可能是将直接线程管理换成使用任务(这不是您当前代码的适当替代品,只是想展示这个想法):

public class TaskWorkerHub
{
    ConcurrentQueue<Action> workQueue = new ConcurrentQueue<Action>();
    int concurrentTasks;
    CancellationTokenSource cancelSource;
    List<Task> workers = new List<Task>();

    private async Task Worker(CancellationToken cancelToken)
    {
        while (workQueue.TryDequeue(out var workTuple))
        {
            await Task.Run(workTuple, cancelToken);
        }
    }

    public TaskWorkerHub(int concurrentTasks = 4)
    {
        this.concurrentTasks = concurrentTasks;
    }

    public void Enqueue(Action work) => workQueue.Enqueue(work);

    public void Start()
    {
        cancelSource  = new CancellationTokenSource();

        for (int i = 0; i < concurrentTasks; i++)
        {        
            workers.Add(Worker(cancelSource.Token));
        }
    }

    public void Stop() => cancelSource.Cancel();

    public Task WaitAsync() => Task.WhenAll(workers);    
}

推荐阅读