首页 > 技术文章 > C#多线程和线程同步总结

birds-zhu 2017-11-15 15:32 原文

Thread

没有参数的线程启动

Thread newThread = new Thread(new ThreadStart(DoWork));

newThread.Start();

  

有参数的线程启动

注意DoWork()的参数必须为object

Thread newThread2 = new Thread(new ParameterizedThreadStart(this.DoWork)); newThread2.Start("111");

  

AutoResetEvent

通知等待的其他线程,本线程已经工作做完.

比如要计算1-N的各个数的平方之和,每个数的平方由不同的线程去计算

/// <summary>
/// 求数组平方和
/// </summary>
public class GetSquareSum
{
    //建立事件数组  ,N个线程,就N个AutoResetEvent
    public AutoResetEvent[] autoEvents = null;  
     //数组  
    public int[] array = null;

    public int Sum = 0;

    public GetSquareSum(int []arrays)
    {
        this.array = arrays;
        autoEvents = new AutoResetEvent[arrays.Length];
        for (int i = 0; i < arrays.Length; i++)
        {
            autoEvents[i] = new AutoResetEvent(false);//初始化
        }
        this.GetResult();
        WaitHandle.WaitAll(autoEvents);
        //autoEvents.ToList().ForEach(s => s.WaitOne());
        foreach (int r in array)
        {
            Sum += r;
        }
        Console.WriteLine("Sum="+Sum);
    }

    public void GetResult()
    {
        for (int i = 0; i < array.Length; i++)
        {
            Calculate(i);
        }
    }

    /// <summary>
    /// 计算第index个数
    /// </summary>
    /// <param name="index"></param>
    public void Calculate(int index)
    {
            Thread newThread2 = new Thread(
                new ParameterizedThreadStart(
                    (obj) =>
                    {
                        int j = (int)obj;
                        array[j] = array[j] * array[j];
                        Console.WriteLine(array[j]);
                        autoEvents[j].Set();
                    }
                    )
                );
            newThread2.Start(index);


    }
}

  

ManualResetEvent

MSDN上的解释是:通知一个或多个正在等待的线程已发生事件。

例子解释:

  • 打印方法已经准备好,但是打印的东西没准备好,所以打印之前mreInit.WaitOne(),等待资源
  • 通过控制台输入资源,mreInit.Set()通知资源准备好,打印程序开始打印

    public class ManualResetEventTestExample2
    {
    
    private static ManualResetEvent mreInit;
    
    private string _test = "";
    public void Test()
    {
    
        mreInit = new ManualResetEvent(false);//
        Thread newThread = new Thread(new ThreadStart(() => Print()));
        newThread.Start();
    
        _test = (Console.ReadLine());
        mreInit.Set();
    }
    
    
    /// <summary>
    /// 打印程序准备就绪
    /// </summary>
    private void Print()
    {
        mreInit.WaitOne();
        Console.WriteLine(_test);
    }
    }
    

      

ManualResetEvent的reset看下面代码

public class ManualResetEventTestExample2
{
    private static ManualResetEvent mreInit;

    private string _test = "";
    public void Test()
    {

        mreInit = new ManualResetEvent(false);//
         new Thread(new ThreadStart(() => Print())).Start();//Print 1
        _test = (Console.ReadLine());//Input 1
        mreInit.Set();


        mreInit.Reset();
        new Thread(new ThreadStart(() => Print())).Start();//Print 2
        new Thread(new ThreadStart(() => Print())).Start();//Print 3
        new Thread(new ThreadStart(() => Print())).Start();//Print 4
        new Thread(new ThreadStart(() => Print())).Start();//Print 5

        _test = (Console.ReadLine());//Input 2
        mreInit.Set();
    }


    /// <summary>
    /// 打印程序准备就绪
    /// </summary>
    private void Print()
    {
        mreInit.WaitOne();
        Console.WriteLine(_test);
    }

  

print 1肯定会在输入后执行,Print 2,3,4,5的执行,则依赖reset.如果没有mreInit.Reset()这句,则Print1,2,3,4,5会直接依次执行.如果加了mreInit.Reset()这句,这Print2,3,4,5只有在input 2执行了之后才会执行.

Reset就是让ManualResetEvent回复到初始状态.

线程池 ThreadPool

public static bool QueueUserWorkItem (WaitCallback callBack);
public static bool QueueUserWorkItem(WaitCallback callback, Object state);

  

这两个方法向线程池的队列添加一个工作项(work item)以及一个可选的状态数据。然后,这两个方法就会立即返回。

工作项其实就是由callback参数标识的一个方法,该方法将由线程池线程执行。 同时写的回调方法必须匹配System.Threading.WaitCallback委托类型,定义为:

public delegate void WaitCallback(Object state);

  

就是说如果要带参数,callback参数必须为object.

 ThreadPool.QueueUserWorkItem(new WaitCallback(obj => { System.Threading.Thread.Sleep(1000); Console.WriteLine(obj.ToString()); }),"canshu");

  

或者这么写

 ThreadPool.QueueUserWorkItem(new WaitCallback(Func), "canshu");
 private void Func(object obj)
    {
        System.Threading.Thread.Sleep(1000); 
        Console.WriteLine(obj.ToString());
    }

  

或者

ThreadPool.QueueUserWorkItem(a=>{Func(a);}, "canshu");

  

以下代码改造求平方的问题.

class ThreadPoolTest2
{
    public void Test()
    { // List<int> result = new List<int>();
        int sum = 0;
        object lockobj = new object();
        Action<object> action = obj =>
        {
            Parm p = (Parm)obj;
            lock (lockobj)
            {
                sum += p.Num * p.Num;
            }
            p.Are.Set();

        };

        int[] array = new int[] { 1, 2, 3, 4, 5,6,7,8,9,10,11,12,13,14,15 };

        AutoResetEvent[] autos = new AutoResetEvent[array.Length];
        for (int i = 0; i < array.Length; i++)
        {
            autos[i] = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem(a=>action(a), new Parm() { Are = autos[i], Num = array[i] });
        }
        Console.WriteLine(sum);
        //WaitHandle.WaitAll(autos);
        autos.ToList().ForEach(s=>s.WaitOne());
        Console.WriteLine(sum);
    }

    class Parm
    {
        public int Num { set; get; }
        public AutoResetEvent Are { set; get; }
    }
}

  

注意:在sum += p.Num * p.Num这句必须lock,不然会得不到正确结果

CancellationTokenSource

例子讲解:有5个数组,需要在数组中找是否有0元素,找到则返回.

实现:每个数据开启一个现场,如果某一个线程先找到0,则返回,同时,通知其他线程不用计算了.

class CancellationTokenSourceTest2
{
    public void Test()
    {
        CancellationTokenSource cts = new CancellationTokenSource();
        CancellationToken token = cts.Token;
        int[][] array = new int[][]
        {
            new []{1,2,3,0,5,6,7,8,9,10},
            new []{1,2,3,4,5,6,7,8,9,0},
            new []{1,2,3,4,5,6,7,8,0,10},
            new []{1,2,3,4,5,6,0,8,9,10},
            new []{1,2,3,4,5,0,7,8,9,10}
        };

        for (int i = 0; i < array.Length; i++)
        {
            Thread th = new Thread(new ParameterizedThreadStart
            ((obj) =>
            {
                Parm p = (Parm)obj;
                int[] a = p.Value;
                for (int n=0;n<a.Length;n++)
                {
                    int j = a[n];
                    Console.WriteLine("in array " + p.Index +" "+ (n + 1) + " times");
                    if (!p.Cts.IsCancellationRequested)
                    {
                        if (j == 0)
                        {
                            p.Cts.Cancel();
                            Console.WriteLine("Cancelling at task " + p.Index + "th array");
                            break;
                        }
                        Thread.Sleep(1000);
                    }
                    else
                    {
                        break;
                    }
                }
            }
                ));
            th.Start(new Parm() { Cts = cts, Value = array[i] , Index=i});
        }
        Console.ReadLine();//敲下回车后,,

    }
    public class Parm
    {
        public CancellationTokenSource Cts;
        public int []Value;
        public int Index;

    }
}

  

运行结果,首先在第0个数组中找到,总共比较次数为20次左右.

也可以尝试用AutoResetEvent实现,用 WaitHandle.WaitAny()等待.

BeginInvoke 和 EndInvoke

例子1:说明异步执行,将一个数乘以2返回,异步后等待结果后求和.

class BeginInvokeTest
{
    public void Test()
    {
        Func<int, int> GetDouble = (a) => { Console.WriteLine(a); return a * 2; };

        IAsyncResult ar1 = GetDouble.BeginInvoke(100, null, null);
        IAsyncResult ar2 = GetDouble.BeginInvoke(200, null, null);

        int result = GetDouble.EndInvoke(ar1)+GetDouble.EndInvoke(ar2);
        Console.WriteLine("result = " + result);
    }
}

  

打印结果为

100

200

result = 600

或者

200

100

result = 600

100和200的打印是异步执行,不知道誰先执行完,但是求和得等到EndInvoke两个异步完成,才能计算

例子2:两个异步去发送消息,发送完成后,去做其他事情,并且在发送消息的回调中保存消息记录↓.

class BeginInvokeTest2//回调方法中处理
{
    public void Test()
    {
        Func<string, string> SendMessage = (a) =>
        {
            System.Threading.Thread.Sleep(1000); 
              Console.WriteLine("发送消息:"+a+" ,");
              return "OK";
        };
        AsyncCallback callback = (ar) =>
        {

            Parm p = (Parm)ar.AsyncState;
              if (p.F.EndInvoke(ar) == "OK")
              {
                  Console.WriteLine("消息发送成功,保存聊天记录 "+p.Message);
              }
        };

        Parm p1=new Parm(){ F=SendMessage, Message= "message1"};
        Parm p2 = new Parm() { F = SendMessage, Message = "message2" };

        IAsyncResult iar1 = SendMessage.BeginInvoke(p1.Message, callback, p1);
        IAsyncResult iar2 = SendMessage.BeginInvoke(p2.Message, callback, p2);

        iar1.AsyncWaitHandle.WaitOne();//等待执行完毕,并不是等待callback执行完
        iar2.AsyncWaitHandle.WaitOne();//等待执行完毕,并不是等待callback执行完

        Console.WriteLine("消息发完,做其他事情");
        Console.Read();
    }

    class Parm
    {
       public Func<string, string> F;
       public string Message;
    }
}

  

线程同步Interlocked.Increment

以原子操作的形式递增指定变量的值并存储结果

例子说明:用3种方法对初始值为1的进行一百次++操作.第一种直接加,第二,三种方法多线程.第三种用Interlocked.Increment.运行结果为第二种的结果值可能不为101.

class InterlockedTest
{
    public void Test()
    {
        int N = 1;
        for (int i = 1; i <= 100; i++)
        {
            N++;
        }
        Console.WriteLine(N);

        int M = 1;
        Action<object> action = obj =>
        {
            Parm p = (Parm)obj;
            M++;
            p.au.Set();

        };

        AutoResetEvent[] autos = new AutoResetEvent[100];
        for (int i = 1; i <= 100; i++)
        {
            autos[i-1] = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem(a=>action(a), new Parm() { au = autos[i-1], index = i });
        }
        autos.ToList().ForEach(s => s.WaitOne());
        Console.WriteLine(M);

        int Q = 1;
        Action<object> action2 = obj =>
        {
            Parm p = (Parm)obj;
            Interlocked.Increment(ref Q);
            p.au.Set();

        };
        autos = new AutoResetEvent[100];
        for (int i = 1; i <= 100; i++)
        {
            autos[i - 1] = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem(a => action2(a), new Parm() { au = autos[i - 1], index = i });
        }
        autos.ToList().ForEach(s => s.WaitOne());
        Console.WriteLine(Q);
    }

    public class Parm
    {
        public AutoResetEvent au;
        public int index;
    }
}

  

线程同步 Monitor.Enter和Lock

Lock关键字是一个语法糖,它将Monitor对象进行封装.

语法糖(Syntactic sugar),是由Peter J. Landin(和图灵一样的天才人物,是他最先发现了Lambda演算,由此而创立了函数式编程)创造的一个词语,它意指那些没有给计算机语言添加新功能,而只是对人类来说更“甜蜜”的语法。语法糖往往给程序员提供了更实用的编码方式,有益于更好的编码风格,更易读。不过其并没有给语言添加什么新东西。

例子同Interlocked.Increment,不再详细说明

class MonitorTest
{

    public void Test()
    {
        int N = 1;
        for (int i = 1; i <= 100; i++)
        {
            N++;
        }
        Console.WriteLine(N);

        int M = 1;
        Action<object> action = obj =>
        {
            Parm p = (Parm)obj;
            M++;
            p.au.Set();

        };

        AutoResetEvent[] autos = new AutoResetEvent[100];
        for (int i = 1; i <= 100; i++)
        {
            autos[i - 1] = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem(a => action(a), new Parm() { au = autos[i - 1], index = i });
        }
        autos.ToList().ForEach(s => s.WaitOne());
        Console.WriteLine(M);

        int Q = 1;
        object objMonitor=new object();
        Action<object> action2 = obj =>
        {
            Parm p = (Parm)obj;
            Monitor.Enter(objMonitor);
            try
            {
                Q++;
            }
            finally
            {
                Monitor.Exit(objMonitor);
            }
            p.au.Set();

        };
        autos = new AutoResetEvent[100];
        for (int i = 1; i <= 100; i++)
        {
            autos[i - 1] = new AutoResetEvent(false);
            ThreadPool.QueueUserWorkItem(a => action2(a), new Parm() { au = autos[i - 1], index = i });
        }
        autos.ToList().ForEach(s => s.WaitOne());
        Console.WriteLine(Q);
    }

    public class Parm
    {
        public AutoResetEvent au;
        public int index;
    }
}

  

ReaderWriterLock

例子说明:ReaderWriterLockEntity有2个成员变量X,Y,读写方法各有2个,一个使用读写锁,一个直接读写.我们设定X和Y必须相等.

class ReaderWriterLockEntity
{
    ReaderWriterLock locker = new ReaderWriterLock();

    public int X { set; get; }
    public int Y { set; get; }

    public void ReadLock(ref int x, ref int y)
    {
        locker.AcquireReaderLock(Timeout.Infinite);
        try
        {
            x = this.X;
            y = this.Y;
        }
        finally
        {
            locker.ReleaseReaderLock();
        }
    }

    public void WriteLock(int x, int y)
    {
        locker.AcquireWriterLock(Timeout.Infinite);
        try
        {
            this.X = x;
            Thread.Sleep(10);
            this.Y = y;
        }
        finally
        {
            locker.ReleaseWriterLock();
        }
    }

    public void Read(ref int x, ref int y)
    {
        x = this.X;
        y = this.Y;
    }

    public void Write(int x, int y)
    {
        this.X = x;
        Thread.Sleep(10);
        this.Y = y;
    }
}

  

读写类,其中Test方法中,开启两组现场写线程,对ReaderWriterLockEntity中的X,Y进行自增.两组读线程,不停的打印X,Y.

打印的结果中,有X和Y不相等的结果出现.

Test2方法中,加了读写锁,打印的结果,不会有X和Y不等的情况.

ReaderWriterLock和Lock的区别就是,当加了ReaderLock,应该不会影响多个ReaderLock的增加.一个资源可以被Reader Lock多次.而lock则达不到这个要求.所以,ReadWriterLock在某些场景,比Lock效率更高.

class ReaderWriterTest2
{
    ReaderWriterLockEntity entity = new ReaderWriterLockEntity();
    public void Test()
    {


        Action writer = () =>
        {
            int a = 10;
            int b = 10;
            //Console.WriteLine("************** Write *************");

            for (int i = 0; i < 5; i++)
            {
                this.entity.Write(a++, b++);
                Thread.Sleep(10);
            }
        };

        Action reader = () =>
        {
           // Console.WriteLine("************** Reader *************");

            int x=0, y=0;
            for (int i = 0; i < 50; i++)
            {
                this.entity.Read(ref x, ref y);
                Console.WriteLine("Read:X={0},y={1}", x, y);
                Thread.Sleep(1);
            }
        };

        //Writer Threads
        Thread wt1 = new Thread(new ThreadStart(writer));
        wt1.Start();
        Thread wt2 = new Thread(new ThreadStart(writer));
        wt2.Start();

        //Reader Threads
        Thread rt1 = new Thread(new ThreadStart(reader));
        rt1.Start();
        Thread rt2 = new Thread(new ThreadStart(reader));
        rt2.Start();
    }

    public void Test2()
    {
        Action writer = () =>
        {
            int a = 10;
            int b = 10;
            for (int i = 0; i < 5; i++)
            {
                this.entity.WriteLock(a++, b++);
                Thread.Sleep(10);
            }
        };

        Action reader = () =>
        {
            int x = 0, y = 0;
            for (int i = 0; i < 50; i++)
            {
                this.entity.ReadLock(ref x, ref y);
                Console.WriteLine("Read:X={0},y={1}", x, y);
                Thread.Sleep(1);
            }
        };

        //Writer Threads
        Thread wt1 = new Thread(new ThreadStart(writer));
        wt1.Start();
        Thread wt2 = new Thread(new ThreadStart(writer));
        wt2.Start();

        //Reader Threads
        Thread rt1 = new Thread(new ThreadStart(reader));
        rt1.Start();
        Thread rt2 = new Thread(new ThreadStart(reader));
        rt2.Start();
    }
}

  

Semaphore信号量

例子说明:厕所有5个位置,每个人上厕所五秒.程序输入1后,表示有一个人要上厕所.如果厕所已满,则会自动等待.

class SemaphoreTest
{
    Semaphore sem = new Semaphore(5,5);//厕所空的

    public void In()
    {
        sem.WaitOne();
        Console.WriteLine("有空位,上厕所");
        Thread.Sleep(5000);//上厕所需要五秒
        sem.Release();//上完了
        Console.WriteLine("出厕所");
    }
    public void Out()
    {
        Thread.Sleep(5000);//上厕所需要五秒
        sem.Release();
        Console.WriteLine("出厕所");
    }
    public void Test()
    {
        while(true)
        {
            string input = Console.ReadLine();
            if (input == "1")//入厕
            {
              new Thread(new ThreadStart(()=>{ In();})).Start(); ;
            }

        }
    }
}

  

不同程序,不同的exe,只要信号量的Name相同,信号量是共享的。

例子说明:修改上厕所的程序.一个程序只负责入厕,什么时候出厕所,则由管理人员来控制(另一个程序). SemaphoreTest2运行后,输入5次1,表示有5个人入厕了.然后在输入几个1,表示还有人在等待.

class SemaphoreTest2
{
    Semaphore sem = new Semaphore(0, 5, "AAA");//厕所空的
    public void In()
    {
        sem.WaitOne();
        Console.WriteLine("有空位,上厕所");
    }

    public void Test()
    {
        while (true)
        {
            string input = Console.ReadLine();
            if (input == "1")//入厕
            {
                new Thread(new ThreadStart(() => { In(); })).Start(); ;
            }

        }
    }
}

  

在启动另外的项目,代码如下:启动后,输入2(喊一个人出厕所),发现入厕程序会有人进入厕所

class SemaphoreTest3
{
    Semaphore sem = new Semaphore(0,5,"AAA");//厕所空的
    public void Out()
    {
        Thread.Sleep(1000);//上厕所需要五秒
        sem.Release();
        Console.WriteLine("出厕所");
    }
    public void Test()
    {
        while(true)
        {
            string input = Console.ReadLine();
            if (input == "2")//出厕
            {
              new Thread(new ThreadStart(()=>{ Out();})).Start(); ;
            }

        }
    }
}

  

注意:这是2个不同的程序,他们的Semaphore的Name是相同的.

有一点不明白的地方是在程序1中运行 Semaphore sem = new Semaphore(0, 5, "AAA");//表示厕所满了. 在运行第二个程序 Semaphore sem = new Semaphore(5, 5, "AAA");这时候,程序1中仍然是满的.在new的时候,没有互相干扰.

注意下面这个构造函数,可以检测信号量Name是否重复.

public Semaphore(
int initialCount,
int maximumCount,
string name,
out bool createdNew
)

  

意外发现: Semaphore sem = new Semaphore(0, 5,"厕所"); 如果Name为中文,new Semaphore(0, 5,"厕所")和new Semaphore(5, 5,"厕所")是一样的.一开始就会有5个可用信号量.

Mutex

只理解了这句,Mutex本身是可以系统级别的,所以是可以跨越进程的。比如我们要实现一个软件不能同时打开两次,那么Mutex是可以实现的,而lock和monitor是无法实现的。其他和lock的区别,其实没太懂.

推荐阅读