首页 > 技术文章 > 一些基本的锁

clion 2020-12-03 01:19 原文

一些基本概念

LockSupport

JDK提供的暂停线程的基本类,用C++写的本地方法

  • park() 方法用来暂停
  • unpark(t) 方法解除暂停 t表示暂停的线程
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;

public class T13_TestLockSupport {
    public static void main(String[] args) {
        Thread t = new Thread(()->{
            for (int i = 0; i < 10; i++) {
                System.out.println(i);
                if(i == 5) {
                    LockSupport.park();
                }
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

        t.start();

        LockSupport.unpark(t);

    }
}

AQS

  • AbstractQueueSynchronizer 同步器,内部的数据结构为双向链表

  • 主要实现是一个volatile修饰的int类型的state,然后采用Unsafe的CAS去修改,

    • 主要流程是将想要获取资源的线程构造成一个Node节点

    • 节点构造完成后就是要进行获取资源了

    • 首先检查队列是否初始化没有就初始化然后返回前驱节点

          private Node enq(Node node) {
              for (;;) {
                  Node oldTail = tail;
                  if (oldTail != null) {
                      node.setPrevRelaxed(oldTail);
                      if (compareAndSetTail(oldTail, node)) {
                          oldTail.next = node;
                          return oldTail;
                      }
                  } else {
                      initializeSyncQueue();
                  }
              }
          }
      
    • 在非公平模式下,新加入的节点会先去队头利用CAS抢一下执行,不行就插入到队尾

    • 在公平模式下会直接到队尾

  • 所以一般来说非公平模式效率会更好

ReentrantLock

  • 基于AQS实现的一个可重入锁,分为公平模式与非公平模式

  • 在使用该类进行加锁时,可以创建不同的condition进行控制 也就是分条件控制

  • 调用lock方法之后要跟着try catch

  • lock方法进行加锁 unlock方法进行解锁

import java.util.concurrent.locks.ReentrantLock;

public class T05_ReentrantLock5 extends Thread {
      
   private static ReentrantLock lock=new ReentrantLock(true); //����Ϊtrue��ʾΪ��ƽ������Ա�������
    @Override
    public void run() {
        for(int i=0; i<100; i++) {
            lock.lock();
            try{
                System.out.println(Thread.currentThread().getName()+"�����");
            }finally{
                lock.unlock();
            }
        }
    }
    public static void main(String[] args) {
        T05_ReentrantLock5 rl=new T05_ReentrantLock5();
        Thread th1=new Thread(rl);
        Thread th2=new Thread(rl);
        th1.start();
        th2.start();
    }
}

semaphore

  • 信号量 类比车道或者收费站

  • 基于AQS实现的一个控制流量的锁

  • 在其内部有一个继承了AQS的内部类 同ReentrantLock一样

  • 只有一定数量的线程才可以运行 在创建时指定 也就是AQS中的state变量开始前acquire()获取 在运行完后要使用release方法释放

    package juc.main.java.com.mashibing.juc.c_020;
    
    import java.util.concurrent.Semaphore;
    
    public class T11_TestSemaphore {
        public static void main(String[] args) {
            //Semaphore s = new Semaphore(2);
            Semaphore s = new Semaphore(2, true);
            //允许一个线程同时执行
            //Semaphore s = new Semaphore(1);new Thread(()->{
            try {
                //获取信号量
                s.acquire();
    
                System.out.println("T1 running...");
                Thread.sleep(200);
                System.out.println("T1 running...");
    
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                //释放信号量
                s.release();
            }
        }).start();
    
        new Thread(()->{
            try {
                s.acquire();
    
                System.out.println("T2 running...");
                Thread.sleep(200);
                System.out.println("T2 running...");
    
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();
    }
    

CountDownLatch

  • 顾名思义,倒数的栅栏,就是等待全部线程或者过程完成后才可以进行下一步操作
  • 适用于那些需要全部完成后才能进行下一步的操作比如处理excel表格的sheet 当全部sheet处理完后再进行下一步
  • 使用join也可达到此效果 但是效率较低也更麻烦
package juc.main.java.com.mashibing.juc.c_020;

import java.util.concurrent.CountDownLatch;

public class T06_TestCountDownLatch {
    public static void main(String[] args) {
        usingJoin();
        usingCountDownLatch();
    }

    private static void usingCountDownLatch() {
        Thread[] threads = new Thread[100];
        CountDownLatch latch = new CountDownLatch(threads.length);

        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
                latch.countDown();
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("end latch");
    }

    private static void usingJoin() {
        Thread[] threads = new Thread[100];

        for(int i=0; i<threads.length; i++) {
            threads[i] = new Thread(()->{
                int result = 0;
                for(int j=0; j<10000; j++) result += j;
            });
        }

        for (int i = 0; i < threads.length; i++) {
            threads[i].start();
        }

        for (int i = 0; i < threads.length; i++) {
            try {
                threads[i].join();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        System.out.println("end join");
    }
}

CyclicBarrier

  • 意为循环的栅栏,跟countDownLacth差不多只不过这个可以循环 可以使用reset进行重置
  • 调用await会自动将state减一
  • 可以在初始化时传入一个回调函数,当一轮完成后会自动进行调用
package juc.main.java.com.mashibing.juc.c_020;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class T07_TestCyclicBarrier {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("hhh"));
        for(int i=0; i<100; i++) {

                new Thread(()->{
                    try {
                        barrier.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            
        }
    }
}

ReadWriteLock

  • 顾名思义:读写锁
  • 调用readLock返回读锁
  • 调用writeLock返回写锁
  • 读锁时共享锁 写锁是独占锁
  • 写时不可读 读时不可写 读时可多个线程读 写时只有一个线程可写
package juc.main.java.com.mashibing.juc.c_020;

import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class T10_TestReadWriteLock {
    static Lock lock = new ReentrantLock();
    private static int value;

    static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
    static Lock readLock = readWriteLock.readLock();
    static Lock writeLock = readWriteLock.writeLock();

    public static void read(Lock lock) {
        readLock.lock();
        try {
            Thread.sleep(1000);
            System.out.println("read over!");
            //模拟读取操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void write(Lock lock, int v) {
        writeLock.lock();
        try {
            Thread.sleep(1000);
            value = v;
            System.out.println("write over!");
            //模拟写操作
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }


    public static void main(String[] args) {
        Runnable readR = () -> read(readLock);

        //Runnable writeR = ()->write(lock, new Random().nextInt());
        Runnable writeR = () -> write(writeLock, new Random().nextInt());

        for (int i = 0; i < 18; i++) new Thread(readR).start();
        for (int i = 0; i < 2; i++) new Thread(writeR).start();


    }
}

其他不基于AQS的Lock

Phaser

  • 使用阶段任务 具体不清楚
package juc.main.java.com.mashibing.juc.c_020;

import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;

public class T09_TestPhaser2 {
    static Random r = new Random();
    static MarriagePhaser phaser = new MarriagePhaser();


    static void milliSleep(int milli) {
        try {
            TimeUnit.MILLISECONDS.sleep(milli);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {

        phaser.bulkRegister(7);

        for(int i=0; i<5; i++) {

            new Thread(new Person("小王" + i)).start();
        }

        new Thread(new Person("小李")).start();
        new Thread(new Person("校长")).start();

    }



    static class MarriagePhaser extends Phaser {
        @Override
        protected boolean onAdvance(int phase, int registeredParties) {

            switch (phase) {
                case 0:
                    System.out.println("1" + registeredParties);
                    System.out.println();
                    return false;
                case 1:
                    System.out.println("2" + registeredParties);
                    System.out.println();
                    return false;
                case 2:
                    System.out.println("3" + registeredParties);
                    System.out.println();
                    return false;
                case 3:
                    System.out.println("4" + registeredParties);
                    return true;
                default:
                    return true;
            }
        }
    }


    static class Person implements Runnable {
        String name;

        public Person(String name) {
            this.name = name;
        }

        public void arrive() {

            milliSleep(r.nextInt(1000));
            System.out.printf("%s 到达\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void eat() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 吃!\n", name);
            phaser.arriveAndAwaitAdvance();
        }

        public void leave() {
            milliSleep(r.nextInt(1000));
            System.out.printf("%s 离开\n", name);


            phaser.arriveAndAwaitAdvance();
        }

        private void hug() {
            if(name.equals("校长") || name.equals("小李")) {
                milliSleep(r.nextInt(1000));
                System.out.printf("%s 拥抱\n", name);
                phaser.arriveAndAwaitAdvance();
            } else {
                phaser.arriveAndDeregister();
                //phaser.register()
            }
        }

        @Override
        public void run() {
            arrive();
            eat();
            leave();
            hug();
        }
    }
}

Exchanger

  • 交换器
  • 两个线程交换数据使用 一方不到达 另一方不走
  • 必须使用同一个exchanger才行 要不然鬼知道到没到
package juc.main.java.com.mashibing.juc.c_020;

import java.util.concurrent.Exchanger;

public class T12_TestExchanger {

    static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args) {
        new Thread(()->{
            String s = "T1";
            try {
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t1").start();


        new Thread(()->{
            String s = "T2";
            try {
                //要交换的
                s = exchanger.exchange(s);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + " " + s);

        }, "t2").start();
    }
}

推荐阅读