多线程
创建线程的四种方式
1、继承Thread
Thread01 thread = new Thread01();
thread.start();
2、实现Runable接口
Runable01 runable = new Runable();
new Thread(runable).start();
3、实现Callable接口 + FutureTask(可以拿到返回结果,可以处理异常)
Callable01 callable01 = new Callable01();
FutureTask<Integer> futureTask = new FutureTask<>(new callable01());
new Thread(futureTask).start();
Integer result = futureTask.get(); // 阻塞等待,直到整个线程执行完成,获得返回结果
- FutureTask源码
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
// Runnable也可以获得返回结果
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
4、线程池 (最常用) ,应该将所有多线程异步任务都交给线程池执行
Excutors(少用)
public static ExecutorService service = Executors.newFixedThreadPool(10);
原生ThreadExcutorPool(常用)
7大参数源码
/**
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @param handler the handler to use when execution is blocked
* because the thread bounds and queue capacities are reached
*/
public ThreadPoolExecutor(
int corePoolSize, // 核心线程数,创建好就准备就绪的线程数量,一直存在,空闲也不会被释放,除非设置allowCoreThreadTimeOut
int maximumPoolSize, // 最大线程数量,用于控制资源,空闲线程超过指定的keepAliveTime时间会被释放
long keepAliveTime, // 存活时间。如果当前的线程数大于核心线程数,那么只要线程空闲时间大于指定keepAliveTime,就释放该线程
TimeUnit unit, // 存活时间的时间单位
BlockingQueue<Runnable> workQueue, // 阻塞式工作队列,如果任务很多,就会将多的任务放进该队列,只要有线程空闲就会去队列取新任务执行
ThreadFactory threadFactory, // 线程的创建工厂
RejectedExecutionHandler handler // 如果workQueue工作队列满了,按照指定的handler方法执行拒绝策略执行任务
)
开发中一般只用使用线程池ThreadExecutorPool,可以降低线程创建和销毁带来的性能损耗、提高响应速度、提高线程可管理性
线程池原理
其实java线程池的实现原理很简单,说白了就是一个线程集合workerSet和一个阻塞队列workQueue。当用户向线程池提交一个任务(也就是线程)时,线程池会先将任务放入workQueue中。workerSet中的线程会不断的从workQueue中获取线程然后执行。当workQueue中没有任务的时候,worker就会阻塞,直到队列中有任务了就取出来继续执行。
- corePoolSize: 规定线程池有几个线程(worker)在运行。
- maximumPoolSize: 当workQueue满了,不能添加任务的时候,这个参数才会生效。规定线程池最多只能有多少个线程(worker)在执行。
- keepAliveTime: 超出corePoolSize大小的那些线程的生存时间,这些线程如果长时间没有执行任务并且超过了keepAliveTime设定的时间,就会消亡。
- unit: 生存时间对于的单位
- workQueue: 存放任务的队列
- threadFactory: 创建线程的工厂
- handler: 当workQueue已经满了,并且线程池线程数已经达到maximumPoolSize,将执行拒绝策略。
任务提交后的流程分析
用户通过submit提交一个任务。线程池会执行如下流程:
- 判断当前运行的worker数量是否超过corePoolSize, 如果不超过corePoolSize。就创建一个worker直接执行该任务。—— 线程池最开始是没有worker在运行的
- 如果正在运行的worker数量超过或者等于corePoolSize,那么就将该任务加入到workQueue队列中去。
- 如果workQueue队列满了,也就是offer方法返回false的话,就检查当前运行的worker数量是否小于maximumPoolSize,如果小于就创建一个worker直接执行该任务。
- 如果当前运行的worker数量是否大于等于maximumPoolSize,那么就执行RejectedExecutionHandler来拒绝这个任务的提交。
源码解析
我们先来看一下ThreadPoolExecutor中的几个关键属性。
//这个属性是用来存放 当前运行的worker数量以及线程池状态的
//int是32位的,这里把int的高3位拿来充当线程池状态的标志位,后29位拿来充当当前运行worker的数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//存放任务的阻塞队列
private final BlockingQueue<Runnable> workQueue;
//worker的集合,用set来存放
private final HashSet<Worker> workers = new HashSet<Worker>();
//历史达到的worker数最大值
private int largestPoolSize;
//当队列满了并且worker的数量达到maxSize的时候,执行具体的拒绝策略
private volatile RejectedExecutionHandler handler;
//超出coreSize的worker的生存时间
private volatile long keepAliveTime;
//常驻worker的数量
private volatile int corePoolSize;
//最大worker的数量,一般当workQueue满了才会用到这个参数
private volatile int maximumPoolSize;
1. 提交任务相关源码
下面是execute方法的源码
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
//workerCountOf(c)会获取当前正在运行的worker数量
if (workerCountOf(c) < corePoolSize) {
//如果workerCount小于corePoolSize,就创建一个worker然后直接执行该任务
if (addWorker(command, true))
return;
c = ctl.get();
}
//isRunning(c)是判断线程池是否在运行中,如果线程池被关闭了就不会再接受任务
//后面将任务加入到队列中
if (isRunning(c) && workQueue.offer(command)) {
//如果添加到队列成功了,会再检查一次线程池的状态
int recheck = ctl.get();
//如果线程池关闭了,就将刚才添加的任务从队列中移除
if (! isRunning(recheck) && remove(command))
//执行拒绝策略
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//如果加入队列失败,就尝试直接创建worker来执行任务
else if (!addWorker(command, false))
//如果创建worker失败,就执行拒绝策略
reject(command);
}
添加worker的方法addWorker源码
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
//使用自旋+cas失败重试来保证线程竞争问题
for (;;) {
//先获取线程池的状态
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池是关闭的,或者workQueue队列非空,就直接返回false,不做任何处理
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);
//根据入参core 来判断可以创建的worker数量是否达到上限,如果达到上限了就拒绝创建worker
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//没有的话就尝试修改ctl添加workerCount的值。这里用了cas操作,如果失败了下一个循环会继续重试,直到设置成功
if (compareAndIncrementWorkerCount(c))
//如果设置成功了就跳出外层的那个for循环
break retry;
//重读一次ctl,判断如果线程池的状态改变了,会再重新循环一次
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
//创建一个worker,将提交上来的任务直接交给worker
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//加锁,防止竞争
mainLock.lock();
try {
int c = ctl.get();
int rs = runStateOf(c);
//还是判断线程池的状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//如果worker的线程已经启动了,会抛出异常
if (t.isAlive())
throw new IllegalThreadStateException();
//添加新建的worker到线程池中
workers.add(w);
int s = workers.size();
//更新历史worker数量的最大值
if (s > largestPoolSize)
largestPoolSize = s;
//设置新增标志位
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//如果worker是新增的,就启动该线程
if (workerAdded) {
t.start();
//成功启动了线程,设置对应的标志位
workerStarted = true;
}
}
} finally {
//如果启动失败了,会触发执行相应的方法
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
2. Worker的结构
Worker是ThreadPoolExecutor内部定义的一个内部类。我们先看一下Worker的继承关系
private final class Worker extends AbstractQueuedSynchronizer implements Runnable
它实现了Runnable接口,所以可以拿来当线程用。同时它还继承了AbstractQueuedSynchronizer同步器类,主要用来实现一个不可重入的锁。
一些属性还有构造方法:
//运行的线程,前面addWorker方法中就是直接通过启动这个线程来启动这个worker
final Thread thread;
//当一个worker刚创建的时候,就先尝试执行这个任务
Runnable firstTask;
//记录完成任务的数量
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
//创建一个Thread,将自己设置给他,后面这个thread启动的时候,也就是执行worker的run方法
this.thread = getThreadFactory().newThread(this);
}
worker的run方法
public void run() {
//这里调用了ThreadPoolExecutor的runWorker方法
runWorker(this);
}
ThreadPoolExecutor的runWorker方法
final void runWorker(Worker w) {
//获取当前线程
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
//执行unlock方法,允许其他线程来中断自己
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//如果前面的firstTask有值,就直接执行这个任务
//如果没有具体的任务,就执行getTask()方法从队列中获取任务
//这里会不断执行循环体,除非线程中断或者getTask()返回null才会跳出这个循环
while (task != null || (task = getTask()) != null) {
//执行任务前先锁住,这里主要的作用就是给shutdown方法判断worker是否在执行中的
//shutdown方法里面会尝试给这个线程加锁,如果这个线程在执行,就不会中断它
w.lock();
//判断线程池状态,如果线程池被强制关闭了,就马上退出
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//执行任务前调用。预留的方法,可扩展
beforeExecute(wt, task);
Throwable thrown = null;
try {
//真正的执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//执行任务后调用。预留的方法,可扩展
afterExecute(task, thrown);
}
} finally {
task = null;
//记录完成的任务数量
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
下面来看一下getTask()方法,这里面涉及到keepAliveTime的使用,从这个方法我们可以看出先吃池是怎么让超过corePoolSize的那部分worker销毁的。
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 如果线程池已经关闭了,就直接返回null,
//如果这里返回null,调用的那个worker就会跳出while循环,然后执行完销毁线程
//SHUTDOWN状态表示执行了shutdown()方法
//STOP表示执行了shutdownNow()方法
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
//获取当前正在运行中的worker数量
int wc = workerCountOf(c);
// 如果设置了核心worker也会超时或者当前正在运行的worker数量超过了corePoolSize,就要根据时间判断是否要销毁线程了
//其实就是从队列获取任务的时候要不要设置超时间时间,如果超过这个时间队列还没有任务进来,就会返回null
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//如果上一次循环从队列获取到的未null,这时候timedOut就会为true了
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//通过cas来设置WorkerCount,如果多个线程竞争,只有一个可以设置成功
//最后如果没设置成功,就进入下一次循环,说不定下一次worker的数量就没有超过corePoolSize了,也就不用销毁worker了
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//如果要设置超时时间,就设置一下咯
//过了这个keepAliveTime时间还没有任务进队列就会返回null,那worker就会销毁
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//如果r为null,就设置timedOut为true
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
3. 添加Callable任务的实现源码
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
要添加一个有返回值的任务的实现也很简单。其实就是对任务做了一层封装,将其封装成Future,然后提交给线程池执行,最后返回这个future。
这里的 newTaskFor(task) 方法会将其封装成一个FutureTask类。
外部的线程拿到这个future,执行get()方法的时候,如果任务本身没有执行完,执行线程就会被阻塞,直到任务执行完。
下面是FutureTask的get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
//判断状态,如果任务还没执行完,就进入休眠,等待唤醒
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//返回值
return report(s);
}
FutureTask中通过一个state状态来判断任务是否完成。当run方法执行完后,会将state状态置为完成,同时唤醒所有正在等待的线程。我们可以看一下FutureTask的run方法
public void run() {
//判断线程的状态
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//执行call方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
//这个方法里面会设置返回内容,并且唤醒所以等待中的线程
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
4. shutdown和shutdownNow方法的实现
shutdown方法会将线程池的状态设置为SHUTDOWN,线程池进入这个状态后,就拒绝再接受任务,然后会将剩余的任务全部执行完
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否可以关闭线程
checkShutdownAccess();
//设置线程池状态
advanceRunState(SHUTDOWN);
//尝试中断worker
interruptIdleWorkers();
//预留方法,留给子类实现
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有的worker
for (Worker w : workers) {
Thread t = w.thread;
//先尝试调用w.tryLock(),如果获取到锁,就说明worker是空闲的,就可以直接中断它
//注意的是,worker自己本身实现了AQS同步框架,然后实现的类似锁的功能
//它实现的锁是不可重入的,所以如果worker在执行任务的时候,会先进行加锁,这里tryLock()就会返回false
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
shutdownNow做的比较绝,它先将线程池状态设置为STOP,然后拒绝所有提交的任务。最后中断左右正在运行中的worker,然后清空任务队列。
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
//检测权限
advanceRunState(STOP);
//中断所有的worker
interruptWorkers();
//清空任务队列
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//遍历所有worker,然后调用中断方法
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
CompletableFuture异步编排
- 创建异步对象
无回调结果的异步方法:CompletableFuture.runAsync(Runnable runnable, Executor executor)
public static void main(String[] args) {
System.out.println("main。。。start。。。");
CompletableFuture.runAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
}, executor);
System.out.println("main。。。end。。。");
}
带有回调结果的异步方法:CompletableFuture.supplyAsyn(Supplier supplier, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main。。。start。。。");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("运行结果:" + i);
return i;
}, executor);
Integer integer = future.get();
System.out.println("main。。。end。。。" + integer);
}
- 计算完成时回调方法
方法完成后的感知 whenCompleteAsync 和 exceptionally
public static void main(String[] args) throws ExecutionException, InterruptedException {
System.out.println("main。。。start。。。");
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).whenCompleteAsync((res, exception) -> {
// 只能得到异常信息,无法修改返回数据
System.out.println("异步任务完成了,结果是:" + res + "异常是:" + exception);
}).exceptionally(throwable -> {
// 可以获取异常信息,同时可以返回默认值
return 10; // 修改返回值future.get() 结果为10
});
Integer integer = future.get();
System.out.println("main。。。end。。。" + integer);
}
/** 结果:
main。。。start。。。
当前线程:11
异步任务完成了,结果是:null异常是:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main。。。end。。。10
**/
- 方法完成后的处理 handle
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).handle((res, thr) -> {
if(res != null) {
return res * 2; // 修改future返回结果
}
if(thr != null) { // 异常
return 0
}
return 0;
});
- 线程串行化
1、thenRunAsync:不能获取到上一步执行结果,无返回值
CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).thenRunAsync(() -> { // 没有返回值, 不能获取到上一步的执行结果
System.out.println("任务2启动了。。");
}, executor);
2、thenAcceptAsync:能接收到上一步结果但无返回值
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).thenAcceptAsync((res) -> { // 没有返回值
System.out.println("任务2启动了。。" + res);
}, executor);
3、thenApplAsyncy:能接收到上一步返回结果,也有返回值
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 0;
System.out.println("运行结果:" + i);
return i;
}, executor).thenApplyAsync((res) -> { // 没有返回值
System.out.println("任务2启动了。。" + res);
return "hello" + res;
}, executor);
- 两任务组合
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("当前线程:" + Thread.currentThread().getId());
int i = 10 / 4;
System.out.println("运行结果:" + i);
return i;
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "hello";
}, executor);
future1.runAfterBothAsync(future2, () -> { // 无法感知前两个任务的结果
System.out.println("任务3开始。。");
}, executor);
future1.thenAcceptBothAsync(future2, (f1, f2) -> { // 能获取到前两个任务的结果
System.out.println("任务3开始。。。之前的结果:" + f1 + "=>" +f2);
},executor);
// 既能获取到前两个任务的返回结果, 又能最终的返回结果future
CompletableFuture<String> future = future1.thenCombineAsync(future2, (f1, f2) -> {
return f1 + ":" + f2 + "-> haha";
}, executor);
// 两个任务只要有一个完成,就执行任务3, 不能接受结果,没有返回值
future1.runAfterEitherAsync(future2, () -> {
System.out.println("任务3开始。。。之前的结果:" + res);
}, executor);
// 两个任务只要有一个完成,就执行任务3, 感知结果,自己没有返回值
future1.acceptEitherAsync(future2, (res) -> {
System.out.println("任务3开始。。。之前的结果:" + res);
}, executor);
// 两个任务只要有一个完成,就执行任务3, 既能感知结果,自己也有返回值
CompletableFuture<Object> future = future1.applyToEitherAsync(future2, res -> {
System.out.println("任务3开始。。。之前的结果:" + res);
}executor);
-
多任务组合操作
// 执行完所有任务才执行 allOf.get() CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3); System.out.println("main。。。end。。。" + allOf.get()); // 有一个执行完就执行 anyOf.get() CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3); System.out.println("main。。。end。。。" + anyOf.get());
Java异步实现
一、创建线程
@Test
public void test0() throws Exception {
System.out.println("main函数开始执行");
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println("===task start===");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("===task finish===");
}
});
thread.start();
System.out.println("main函数执行结束");
}
二、Future
jdk8之前的实现方式,在JUC下增加了Future,从字面意思理解就是未来的意思,但使用起来却着实有点鸡肋,并不能实现真正意义上的异步,获取结果时需要阻塞线程,或者不断轮询。
@Test
public void test1() throws Exception {
System.out.println("main函数开始执行");
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("===task start===");
Thread.sleep(5000);
System.out.println("===task finish===");
return 3;
}
});
//这里需要返回值时会阻塞主线程,如果不需要返回值使用是OK的。倒也还能接收
//Integer result=future.get();
System.out.println("main函数执行结束");
System.in.read();
}
三、CompletableFuture
使用原生的CompletableFuture实现异步操作,加上对lambda的支持,可以说实现异步任务已经发挥到了极致。
@Test
public void test2() throws Exception {
System.out.println("main函数开始执行");
ExecutorService executor = Executors.newFixedThreadPool(2);
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(new Supplier<Integer>() {
@Override
public Integer get() {
System.out.println("===task start===");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("===task finish===");
return 3;
}
}, executor);
future.thenAccept(e -> System.out.println(e));
System.out.println("main函数执行结束");
}
四、Spring的Async注解
使用spring实现异步需要开启注解,可以使用xml方式或者java config的方式。
xml方式: <task:annotation-driven />
<task:annotation-driven executor="executor" />
<task:executor id="executor"
pool-size="2" 线程池的大小
queue-capacity="100" 排队队列长度
keep-alive="120" 线程保活时间(单位秒)
rejection-policy="CALLER_RUNS" 对拒绝的任务处理策略 />
java方式:
@EnableAsync
public class MyConfig {
@Bean
public TaskExecutor executor(){
ThreadPoolTaskExecutor executor=new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); //核心线程数
executor.setMaxPoolSize(20); //最大线程数
executor.setQueueCapacity(1000); //队列大小
executor.setKeepAliveSeconds(300); //线程最大空闲时间
executor.setThreadNamePrefix("fsx-Executor-"); //指定用于新创建的线程名称的前缀。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
(1)@Async
@Test
public void test3() throws Exception {
System.out.println("main函数开始执行");
myService.longtime();
System.out.println("main函数执行结束");
}
@Async
public void longtime() {
System.out.println("我在执行一项耗时任务");
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("完成");
}
(2)AsyncResult
如果需要返回值,耗时方法返回值用AsyncResult包装。
@Test
public void test4() throws Exception {
System.out.println("main函数开始执行");
Future<Integer> future=myService.longtime2();
System.out.println("main函数执行结束");
System.out.println("异步执行结果:"+future.get());
}
@Async
public Future<Integer> longtime2() {
System.out.println("我在执行一项耗时任务");
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("完成");
return new AsyncResult<>(3);
}