多线程(五)线程协作、线程池
线程协作
生产者消费者问题
-
线程同步问题,生产者和消费者共享同一个资源,并且生产者和消费者之间相互依赖,互为条件
- 生产者:没有生产产品之前,要通知消费者;生产了产品之后,要通知消费者消费
- 消费者:消费之后,通知生产者已经结束消费,需要生产新的产品
-
生产者消费者问题中,仅有synchronized是不够的
- synchronized可阻止并发更新同一个共享资源,实现了同步
- synchronized不能用来实现不同线程之间的消息传递(通信)
-
Java提供的(Object类)解决线程之间的通信问题的方法:
方法名 作用 wait() 表示线程一直等待,直到其他线程通知,与sleep不同,会释放锁 wait(long timeout) 指定等待的毫秒数 notify() 唤醒一个处于等待状态的线程 notifyAll() 唤醒同一个对象上所有调用wait()方法的线程,优先级别高的线程优先调度
生产者消费者问题的解决方式
解决方式一:管程法
- 生产者将生产好的数据放入缓冲区,消费者从缓冲区中拿出数据
- 生产者:负责生产数据的模块(可能是方法,对象,线程,进程)
- 消费者:负责处理数据的模块(可能是方法,对象,线程,进程)
- 缓冲区:消费者不能直接使用生产者的数据,他们之间有个“缓冲区”
解决方式二:信号灯法
- 通过信号灯判断是否执行(通过True和False判断是否执行)
线程池
- 背景:由于经常创建和销毁、使用量特别大的资源,比如并发情况下的线程,对性能影响很大,所以用到线程池
- 提前创建好多线程,放入线程池中,使用时直接获取,使用完放回池中。
- 避免频繁创建销毁,实现重复利用
- 线程管理:
- corePoolSize:核心池的大小
- maximumPoolSize:最大线程数
- keepAliveTime:线程没有任务时最多保持多长时间后会终止
线程池相关类:
-
ExecutorService和Executors
-
ExecutorService(接口):
-
ExecutorService是线程池接口
-
子类:ThreadPoolExecutor
-
方法
void execute(Runnable command)//执行任务/命令,没有返回值,一般用来执行Runnable
<T>Fture<T>submit(Callable<T>tasks)//执行任务,有返回值,一般用来执行Callable
void shutdown()//关闭连接池
-
-
Executors:
工具类、线程池的工厂类,用于创建并返回不同类型的线程池
-
-
ExecutorService是一个接口,通过Executors工具类创建池子
代码实例
管程法
- 吃鸡与生产鸡
//测试:生产者消费者模型-->利用缓冲区解决:管程法
public class TestPC {
public static void main(String[] args) {
SynContainer container=new SynContainer();
new Productor(container).start();
new Consumer(container).start();
}
}
//生产者
class Productor extends Thread{
SynContainer container;
public Productor(SynContainer container){
this.container=container;
}
//生产
@Override
public void run() {
for (int i = 0; i < 30; i++) {
System.out.println("生产了"+i+"号鸡-----》第"+(i+1)+"只鸡");
container.push(new Chicken(i));
}
}
}
//消费者
class Consumer extends Thread{
SynContainer container;
public Consumer(SynContainer container){
this.container=container;
}
@Override
public void run() {
for (int i = 0; i < 30; i++) {
System.out.println("消费了"+container.pop().id+"号鸡------》消费的第"+(i+1)+"只鸡");
}
}
}
//产品
class Chicken{
int id;//产品编号
public Chicken(int id){
this.id=id;
}
}
//缓冲区
class SynContainer{
//容器大小
Chicken[] chickens=new Chicken[10];
//容器计数
int count=0;
//生产者放入产品
public synchronized void push(Chicken chicken){
//如果容器满了,就需要等待消费者
if(count==chickens.length){
//生产等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果没有满,则继续消费
chickens[count]=chicken;
count++;
//通知消费者消费
this.notify();
}
//消费者消费产品
public synchronized Chicken pop(){
//判断能否消费
//如果不能消费
if(count==0){
//等待生产者生产,消费者等待
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//如果可以消费
count--;
//吃一个鸡,拿一个鸡
Chicken chicken=chickens[count];
//吃完了,通知生产者生产
this.notify();
return chicken;
}
}
//输出:
生产了0号鸡-----》第1只鸡
生产了1号鸡-----》第2只鸡
生产了2号鸡-----》第3只鸡
生产了3号鸡-----》第4只鸡
生产了4号鸡-----》第5只鸡
生产了5号鸡-----》第6只鸡
生产了6号鸡-----》第7只鸡
生产了7号鸡-----》第8只鸡
生产了8号鸡-----》第9只鸡
生产了9号鸡-----》第10只鸡
生产了10号鸡-----》第11只鸡
生产了11号鸡-----》第12只鸡
消费了9号鸡------》消费的第1只鸡
消费了10号鸡------》消费的第2只鸡
生产了12号鸡-----》第13只鸡
消费了11号鸡------》消费的第3只鸡
生产了13号鸡-----》第14只鸡
消费了12号鸡------》消费的第4只鸡
生产了14号鸡-----》第15只鸡
消费了13号鸡------》消费的第5只鸡
生产了15号鸡-----》第16只鸡
消费了14号鸡------》消费的第6只鸡
生产了16号鸡-----》第17只鸡
消费了15号鸡------》消费的第7只鸡
生产了17号鸡-----》第18只鸡
消费了16号鸡------》消费的第8只鸡
生产了18号鸡-----》第19只鸡
消费了17号鸡------》消费的第9只鸡
生产了19号鸡-----》第20只鸡
消费了18号鸡------》消费的第10只鸡
生产了20号鸡-----》第21只鸡
消费了19号鸡------》消费的第11只鸡
生产了21号鸡-----》第22只鸡
消费了20号鸡------》消费的第12只鸡
生产了22号鸡-----》第23只鸡
消费了21号鸡------》消费的第13只鸡
生产了23号鸡-----》第24只鸡
消费了22号鸡------》消费的第14只鸡
生产了24号鸡-----》第25只鸡
消费了23号鸡------》消费的第15只鸡
生产了25号鸡-----》第26只鸡
消费了24号鸡------》消费的第16只鸡
生产了26号鸡-----》第27只鸡
消费了25号鸡------》消费的第17只鸡
生产了27号鸡-----》第28只鸡
消费了26号鸡------》消费的第18只鸡
生产了28号鸡-----》第29只鸡
消费了27号鸡------》消费的第19只鸡
生产了29号鸡-----》第30只鸡
消费了28号鸡------》消费的第20只鸡
消费了29号鸡------》消费的第21只鸡
消费了8号鸡------》消费的第22只鸡
消费了7号鸡------》消费的第23只鸡
消费了6号鸡------》消费的第24只鸡
消费了5号鸡------》消费的第25只鸡
消费了4号鸡------》消费的第26只鸡
消费了3号鸡------》消费的第27只鸡
消费了2号鸡------》消费的第28只鸡
消费了1号鸡------》消费的第29只鸡
消费了0号鸡------》消费的第30只鸡
信号灯法
- 有节目时观众观看,没有节目时观众不看,演员准备
public class TestPc2 {
public static void main(String[] args) {
TV tv=new TV();
new Player(tv).start();
new Watcher(tv).start();
}
}
//生产者--》演员
class Player extends Thread{
TV tv;
public Player(TV tv){
this.tv=tv;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
if(i%2==0){
this.tv.play("节目一");
}else {
this.tv.play("节目二");
}
}
}
}
//消费者--》观众
class Watcher extends Thread{
TV tv;
public Watcher(TV tv) {
this.tv=tv;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
tv.watch();
}
}
}
//产品--》节目
class TV{
//演员表演,观众等待flag=true
//观众观看,演员等待flag=false
String voice;//表演的节目
boolean flag=true;
//表演
public synchronized void play(String voice){
if(!flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("演员表演了:"+voice);
//通知观众观看
this.notifyAll();//通知唤醒
this.voice=voice;
this.flag=!this.flag;
}
//观看
public synchronized void watch(){
if(flag){
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("观看了:"+voice);
//通知演员表演
this.notifyAll();//通知唤醒
this.flag=!this.flag;
}
}
//输出:
演员表演了:节目一
观看了:节目一
演员表演了:节目二
观看了:节目二
演员表演了:节目一
观看了:节目一
演员表演了:节目二
观看了:节目二
演员表演了:节目一
观看了:节目一
演员表演了:节目二
观看了:节目二
演员表演了:节目一
观看了:节目一
演员表演了:节目二
观看了:节目二
演员表演了:节目一
观看了:节目一
演员表演了:节目二
观看了:节目二
线程池
public class TestPool {
public static void main(String[] args) {
//1.创建服务,创建线程池
//newFixedThreadPool 参数为池子(线程池)的大小
ExecutorService service= Executors.newFixedThreadPool(10);
//执行
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
service.execute(new MyThread());
//2.关闭连接
service.shutdown();
}
}
class MyThread implements Runnable{
@Override
public void run() {
System.out.println(Thread.currentThread().getName());
}
}
//输出:
pool-1-thread-2
pool-1-thread-3
pool-1-thread-1
pool-1-thread-4