首页 > 技术文章 > (八) 显式锁Lock的常用API

monco-sxy 2020-07-04 15:10 原文

显示锁的必要性

Java程序是靠synchronized关键字实现锁功能的,使用synchronized关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。

Lock的基本用法

在finally块中释放锁,目的是保证在获取到锁之后,最终能够被释放。
不要将获取锁的过程写在try块中,因为如果在获取锁(自定义锁的实现)时发生了异常,异常抛出的同时,也会导致锁无故释放。

Lock的常用API

Lock的常用分类

1. ReentrantLock(可重入锁)
synchronized 内部也是支持可重入的,可重入锁及在发生一个线程获取当前方法锁的同时,接着访问这个方法,不需要重新获取锁,即在递归的时候,不会出现获取不到锁而锁死的情况。


import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: LockCase
 * description: 锁的基本使用
 */
public class LockCase {

    /**
     * 定义可重入锁
     */
    private Lock lock = new ReentrantLock();

    private AtomicInteger ageAtomic = new AtomicInteger(10000);

    private Integer age = 10000;

    private static class TestThread extends Thread {

        private LockCase lockCase;

        public TestThread(LockCase lockCase, String name) {
            super(name);
            this.lockCase = lockCase;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100000; i++) {
                lockCase.test();
            }
            System.out.println(Thread.currentThread().getName()
                    + " ageAtomic =  " + lockCase.getAgeAtomic());
            System.out.println(Thread.currentThread().getName()
                    + " age =  " + lockCase.getAge());
        }
    }

    public void test() {
//        lock.lock();
//        try {
//            age++;
//        } finally {
//            lock.unlock();
//        }
        age++;
        ageAtomic.getAndIncrement();
    }

    public void test2() {
//        lock.lock();
//        try {
//            age--;
//        } finally {
//            lock.unlock();
//        }
        age--;
        ageAtomic.getAndDecrement();
    }

    public int getAgeAtomic() {
        return ageAtomic.get();
    }

    public int getAge() {
        return age;
    }

    /**
     * 主线程不断递减
     * 子线程不断累加
     * 最终的结果不变,可以使用之前的 AtomicInteger
     * @param args
     * @throws InterruptedException
     */
    public static void main(String[] args) throws InterruptedException {
        LockCase lockCase = new LockCase();
        Thread endThread = new TestThread(lockCase, "endThread");
        endThread.start();
        for (int i = 0; i < 100000; i++) {
            lockCase.test2();
        }
        // 主线程休眠1s 使用打印语句不会提前打印
        Thread.sleep(1000);
        System.out.println(Thread.currentThread().getName()
                + " ageAtomic =  " + lockCase.getAgeAtomic());
        System.out.println(Thread.currentThread().getName()
                + " age =  " + lockCase.getAge());

    }
}

结果

使用 AtomicInteger 或者 使用锁 可以保证 多线程下数据会正常运行,而不使用锁的情况下,则不能保证。

2. 公平锁和非公平锁

如果在时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。
ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。事实上,公平的锁机制往往没有非公平的效率高。
在激烈竞争的情况下,非公平锁的性能高于公平锁的性能的一个原因是:在恢复一个被挂起的线程与该线程真正开始运行之间存在着严重的延迟。假设线程A持有一个锁,并且线程B请求这个锁。由于这个锁已被线程A持有,因此B将被挂起。当A释放锁时,B将被唤醒,因此会再次尝试获取锁。与此同时,如果C也请求这个锁,那么C很可能会在B被完全唤醒之前获得、使用以及释放这个锁。这样的情况是一种“双赢”的局面:B获得锁的时刻并没有推迟,C更早地获得了锁,并且吞吐量也获得了提高。

3. 读写锁
之前提到锁(如Mutex和ReentrantLock)基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
除了保证写操作对读操作的可见性以及并发性的提升之外,读写锁能够简化读写交互场景的编程方式。
假设在程序中定义一个共享的用作缓存数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见。
在没有读写锁支持的(Java 5之前)时候,如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠synchronized关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得简单明了。
一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。

使用 读写锁


import com.monco.util.SleepTools;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: UseRwLock
 * description: 使用读写锁实现
 */
public class UseRwLock implements GoodsService{

    private GoodsInfo goodsInfo;

    private final ReadWriteLock lock = new ReentrantReadWriteLock();

    private final Lock getLock = lock.readLock();

    private final Lock setLock = lock.writeLock();

    public UseRwLock(GoodsInfo goodsInfo) {
        this.goodsInfo = goodsInfo;
    }

    @Override
    public GoodsInfo getNum() {
        getLock.lock();
        try{
            SleepTools.ms(5);
            return this.goodsInfo;
        }finally {
            getLock.unlock();
        }
    }

    @Override
    public void setNum(int number) {
        setLock.lock();
        try{
            SleepTools.ms(5);
            goodsInfo.changeNumber(number);
        }finally {
            setLock.unlock();
        }
    }
}

使用 synchronized 关键字


import com.monco.util.SleepTools;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: UseSyn
 * description: 用内置锁来实现商品服务接口
 */
public class UseSyn implements GoodsService {

    private GoodsInfo goodsInfo;

    public UseSyn(GoodsInfo goodsInfo) {
        this.goodsInfo = goodsInfo;
    }

    @Override
    public synchronized GoodsInfo getNum() {
        SleepTools.ms(5);
        return this.goodsInfo;
    }

    @Override
    public synchronized void setNum(int number) {
        SleepTools.ms(5);
        goodsInfo.changeNumber(number);
    }
}

测试

package com.monco.ch3.rw;

import com.monco.util.SleepTools;

import java.util.Random;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: BusiApp
 * description: 对商品进行业务的应用
 */
public class BusiApp {

    static final int readWriteRatio = 10;

    static final int minthreadCount = 3;

    /**
     * 读取商品信息 读操作
     */
    private static class GetThread implements Runnable {

        private GoodsService goodsService;

        public GetThread(GoodsService goodsService) {
            this.goodsService = goodsService;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            // 读数据 100 次
            for (int i = 0; i < 100; i++) {
                goodsService.getNum();
            }
            System.out.println(Thread.currentThread().getName() + "读取商品数据耗时:"
                    + (System.currentTimeMillis() - start) + "ms");

        }
    }

    /**
     * 设置商品信息 写操作
     */
    private static class SetThread implements Runnable {

        private GoodsService goodsService;

        public SetThread(GoodsService goodsService) {
            this.goodsService = goodsService;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            Random r = new Random();
            // 写数据 10次
            for (int i = 0; i < 10; i++) {
                SleepTools.ms(50);
                goodsService.setNum(r.nextInt(10));
            }
            System.out.println(Thread.currentThread().getName()
                    + "写商品数据耗时:" + (System.currentTimeMillis() - start) + "ms---------");

        }
    }

    public static void main(String[] args) throws InterruptedException {
        GoodsInfo goodsInfo = new GoodsInfo("Cup", 100000, 10000);
        GoodsService goodsService = new UseRwLock(goodsInfo);
//        GoodsService goodsService = new UseSyn(goodsInfo);
        for (int i = 0; i < minthreadCount; i++) {
            // 启动3个线程写
            Thread setT = new Thread(new SetThread(goodsService));
            // 启动30个线程读
            for (int j = 0; j < readWriteRatio; j++) {
                Thread getT = new Thread(new GetThread(goodsService));
                getT.start();
            }
            SleepTools.ms(100);
            setT.start();
        }
    }
}

结果:
使用读写锁:

使用关键字:

明显看出差距。

Condition的使用

package com.monco.ch3.condition;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: ExpressCond
 * description: condition 的 使用
 */
public class ExpressCond {

    public final static String CITY = "ShangHai";

    /**
     * 快递运输里程数
     */
    private int km;

    /**
     * 快递到达地点
     */
    private String site;

    private Lock kmLock = new ReentrantLock();
    private Lock siteLock = new ReentrantLock();
    private Condition kmCond = kmLock.newCondition();
    private Condition siteCond = siteLock.newCondition();

    public ExpressCond() {
    }

    public ExpressCond(int km, String site) {
        this.km = km;
        this.site = site;
    }

    /**
     * 变化公里数,然后通知处于wait状态并需要处理公里数的线程进行业务处理
     */
    public void changeKm() {
        kmLock.lock();
        try {
            this.km = 101;
            kmCond.signal();
            //kmCond.signalAll();
        } finally {
            kmLock.unlock();
        }


    }

    /**
     * 变化地点,然后通知处于wait状态并需要处理地点的线程进行业务处理
     */
    public void changeSite() {
        siteLock.lock();
        try {
            this.site = "BeiJing";
            // 通知其他在锁上等待的线程
            siteCond.signal();
        } finally {
            siteLock.unlock();
        }
    }

    /**
     * 当快递的里程数大于100时更新数据库
     */
    public void waitKm() {
        kmLock.lock();
        try {
            while (this.km < 100) {
                try {
                    kmCond.await();
                    System.out.println("Check Site thread["
                            + Thread.currentThread().getId()
                            + "] is be notified");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            kmLock.unlock();
        }
        System.out.println("the Km is " + this.km + ",I will change db");
    }

    /**
     * 当快递到达目的地时通知用户
     */
    public void waitSite() {
        siteLock.lock();
        try {
            while (this.site.equals(CITY)) {
                try {
                    // 当前线程进行等待
                    siteCond.await();
                    System.out.println("check Site thread[" + Thread.currentThread().getName()
                            + "] is be notify");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        } finally {
            siteLock.unlock();
        }
        System.out.println("the site is " + this.site + ",I will call user");
    }
}

测试

package com.monco.ch3.condition;

/**
 * @author : monco
 * @date : 2019/10/11 1:46
 * className: TestCond
 * description: 测试Lock和Condition实现等待通知
 */
public class TestCond {

    private static ExpressCond express = new ExpressCond(0, ExpressCond.CITY);

    /**
     * 检查里程数变化的线程,不满足条件,线程一直等待
     */
    private static class CheckKm extends Thread {
        @Override
        public void run() {
            express.waitKm();
        }
    }

    /**
     * 检查地点变化的线程,不满足条件,线程一直等待
     */
    private static class CheckSite extends Thread {
        @Override
        public void run() {
            express.waitSite();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        for (int i = 0; i < 3; i++) {
            new CheckSite().start();
        }
        for (int i = 0; i < 3; i++) {
            new CheckKm().start();
        }
        Thread.sleep(1000);
        //快递里程变化
        express.changeKm();
    }
}

结果

这个和synconized 的 wait 和 notify notifyAll 是一个道理,无非是针对锁Lock的一种实现方式罢了。
在使用condition的时候,我们大部分情况已经明确需要唤醒哪些线程,所以说大部分使用signal就可以,而不需要使用signalAll()

推荐阅读