一些基本概念
LockSupport
JDK提供的暂停线程的基本类,用C++写的本地方法
- park() 方法用来暂停
- unpark(t) 方法解除暂停 t表示暂停的线程
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
public class T13_TestLockSupport {
public static void main(String[] args) {
Thread t = new Thread(()->{
for (int i = 0; i < 10; i++) {
System.out.println(i);
if(i == 5) {
LockSupport.park();
}
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
t.start();
LockSupport.unpark(t);
}
}
AQS
-
AbstractQueueSynchronizer 同步器,内部的数据结构为双向链表
-
主要实现是一个volatile修饰的int类型的state,然后采用Unsafe的CAS去修改,
-
主要流程是将想要获取资源的线程构造成一个Node节点
-
节点构造完成后就是要进行获取资源了
-
首先检查队列是否初始化没有就初始化然后返回前驱节点
private Node enq(Node node) { for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return oldTail; } } else { initializeSyncQueue(); } } }
-
在非公平模式下,新加入的节点会先去队头利用CAS抢一下执行,不行就插入到队尾
-
在公平模式下会直接到队尾
-
- 所以一般来说非公平模式效率会更好
ReentrantLock
-
基于AQS实现的一个可重入锁,分为公平模式与非公平模式
-
在使用该类进行加锁时,可以创建不同的condition进行控制 也就是分条件控制
-
调用lock方法之后要跟着try catch
-
lock方法进行加锁 unlock方法进行解锁
import java.util.concurrent.locks.ReentrantLock;
public class T05_ReentrantLock5 extends Thread {
private static ReentrantLock lock=new ReentrantLock(true); //����Ϊtrue��ʾΪ��ƽ������Ա�������
@Override
public void run() {
for(int i=0; i<100; i++) {
lock.lock();
try{
System.out.println(Thread.currentThread().getName()+"�����");
}finally{
lock.unlock();
}
}
}
public static void main(String[] args) {
T05_ReentrantLock5 rl=new T05_ReentrantLock5();
Thread th1=new Thread(rl);
Thread th2=new Thread(rl);
th1.start();
th2.start();
}
}
semaphore
-
信号量 类比车道或者收费站
-
基于AQS实现的一个控制流量的锁
-
在其内部有一个继承了AQS的内部类 同ReentrantLock一样
-
只有一定数量的线程才可以运行 在创建时指定 也就是AQS中的state变量开始前acquire()获取 在运行完后要使用release方法释放
package juc.main.java.com.mashibing.juc.c_020; import java.util.concurrent.Semaphore; public class T11_TestSemaphore { public static void main(String[] args) { //Semaphore s = new Semaphore(2); Semaphore s = new Semaphore(2, true); //允许一个线程同时执行 //Semaphore s = new Semaphore(1);new Thread(()->{ try { //获取信号量 s.acquire(); System.out.println("T1 running..."); Thread.sleep(200); System.out.println("T1 running..."); } catch (InterruptedException e) { e.printStackTrace(); } finally { //释放信号量 s.release(); } }).start(); new Thread(()->{ try { s.acquire(); System.out.println("T2 running..."); Thread.sleep(200); System.out.println("T2 running..."); s.release(); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
CountDownLatch
- 顾名思义,倒数的栅栏,就是等待全部线程或者过程完成后才可以进行下一步操作
- 适用于那些需要全部完成后才能进行下一步的操作比如处理excel表格的sheet 当全部sheet处理完后再进行下一步
- 使用join也可达到此效果 但是效率较低也更麻烦
package juc.main.java.com.mashibing.juc.c_020;
import java.util.concurrent.CountDownLatch;
public class T06_TestCountDownLatch {
public static void main(String[] args) {
usingJoin();
usingCountDownLatch();
}
private static void usingCountDownLatch() {
Thread[] threads = new Thread[100];
CountDownLatch latch = new CountDownLatch(threads.length);
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
latch.countDown();
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end latch");
}
private static void usingJoin() {
Thread[] threads = new Thread[100];
for(int i=0; i<threads.length; i++) {
threads[i] = new Thread(()->{
int result = 0;
for(int j=0; j<10000; j++) result += j;
});
}
for (int i = 0; i < threads.length; i++) {
threads[i].start();
}
for (int i = 0; i < threads.length; i++) {
try {
threads[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("end join");
}
}
CyclicBarrier
- 意为循环的栅栏,跟countDownLacth差不多只不过这个可以循环 可以使用reset进行重置
- 调用await会自动将state减一
- 可以在初始化时传入一个回调函数,当一轮完成后会自动进行调用
package juc.main.java.com.mashibing.juc.c_020;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class T07_TestCyclicBarrier {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(20, () -> System.out.println("hhh"));
for(int i=0; i<100; i++) {
new Thread(()->{
try {
barrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}).start();
}
}
}
ReadWriteLock
- 顾名思义:读写锁
- 调用readLock返回读锁
- 调用writeLock返回写锁
- 读锁时共享锁 写锁是独占锁
- 写时不可读 读时不可写 读时可多个线程读 写时只有一个线程可写
package juc.main.java.com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class T10_TestReadWriteLock {
static Lock lock = new ReentrantLock();
private static int value;
static ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
static Lock readLock = readWriteLock.readLock();
static Lock writeLock = readWriteLock.writeLock();
public static void read(Lock lock) {
readLock.lock();
try {
Thread.sleep(1000);
System.out.println("read over!");
//模拟读取操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void write(Lock lock, int v) {
writeLock.lock();
try {
Thread.sleep(1000);
value = v;
System.out.println("write over!");
//模拟写操作
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public static void main(String[] args) {
Runnable readR = () -> read(readLock);
//Runnable writeR = ()->write(lock, new Random().nextInt());
Runnable writeR = () -> write(writeLock, new Random().nextInt());
for (int i = 0; i < 18; i++) new Thread(readR).start();
for (int i = 0; i < 2; i++) new Thread(writeR).start();
}
}
其他不基于AQS的Lock
Phaser
- 使用阶段任务 具体不清楚
package juc.main.java.com.mashibing.juc.c_020;
import java.util.Random;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
public class T09_TestPhaser2 {
static Random r = new Random();
static MarriagePhaser phaser = new MarriagePhaser();
static void milliSleep(int milli) {
try {
TimeUnit.MILLISECONDS.sleep(milli);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
phaser.bulkRegister(7);
for(int i=0; i<5; i++) {
new Thread(new Person("小王" + i)).start();
}
new Thread(new Person("小李")).start();
new Thread(new Person("校长")).start();
}
static class MarriagePhaser extends Phaser {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
switch (phase) {
case 0:
System.out.println("1" + registeredParties);
System.out.println();
return false;
case 1:
System.out.println("2" + registeredParties);
System.out.println();
return false;
case 2:
System.out.println("3" + registeredParties);
System.out.println();
return false;
case 3:
System.out.println("4" + registeredParties);
return true;
default:
return true;
}
}
}
static class Person implements Runnable {
String name;
public Person(String name) {
this.name = name;
}
public void arrive() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 到达\n", name);
phaser.arriveAndAwaitAdvance();
}
public void eat() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 吃!\n", name);
phaser.arriveAndAwaitAdvance();
}
public void leave() {
milliSleep(r.nextInt(1000));
System.out.printf("%s 离开\n", name);
phaser.arriveAndAwaitAdvance();
}
private void hug() {
if(name.equals("校长") || name.equals("小李")) {
milliSleep(r.nextInt(1000));
System.out.printf("%s 拥抱\n", name);
phaser.arriveAndAwaitAdvance();
} else {
phaser.arriveAndDeregister();
//phaser.register()
}
}
@Override
public void run() {
arrive();
eat();
leave();
hug();
}
}
}
Exchanger
- 交换器
- 两个线程交换数据使用 一方不到达 另一方不走
- 必须使用同一个exchanger才行 要不然鬼知道到没到
package juc.main.java.com.mashibing.juc.c_020;
import java.util.concurrent.Exchanger;
public class T12_TestExchanger {
static Exchanger<String> exchanger = new Exchanger<>();
public static void main(String[] args) {
new Thread(()->{
String s = "T1";
try {
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t1").start();
new Thread(()->{
String s = "T2";
try {
//要交换的
s = exchanger.exchange(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " " + s);
}, "t2").start();
}
}