Spring中基于@Async的异步线程池构建与使用
在处理队列中的请求或者与第三方系统的交互时,异步处理较为常见,为充分利用系统资源,常规多采用构建线程池的方式,但线程池的构建成本高、代码维护困难;Spring 3.x 引入了@Async可完美解决这类异步处理难题,简洁,易用,可读性强。本文就以实际应用中,处理redis队列中异步请求为例,结合前辈们的总结和自己的实际应用,简要概述@Async在实际应用的特点。
关于异步调用
何为异步调用
同步就是整个处理过程顺序执行,当各个过程都执行完毕,并返回结果。
异步调用则是只是发送了调用的指令,调用者无需等待被调用的方法完全执行完毕;而是继续执行下面的流程。
常规的多线程处理均为异步调用,例如, 在某个调用中,需要顺序调用 A, B, C三个过程方法;如他们都是同步调用,则需要将他们都顺序执行完毕之后,方算作过程执行完毕; 如B为一个异步的调用方法,则在执行完A之后,调用B,并不等待B完成,而是执行开始调用C,待C执行完毕之后,就意味着这个过程执行完毕了。
常规的异步调用处理方式
在Java中,一般在处理类似的场景之时,都是基于创建独立的线程去完成相应的异步调用逻辑,通过主线程和不同的线程之间的执行流程,从而在启动独立的线程之后,主线程继续执行而不会产生停滞等待的情况。
@Async介绍
在Spring中,基于@Async标注的方法,称之为异步方法;这些方法将在执行的时候,将会在独立的线程中被执行,调用者无需等待它的完成,即可继续其他的操作。
@Async的构建与使用
基于注解的使用方法
关于基于xml的使用方式,不做赘述,基于注解的使用方式包括如下三步:
- 启动类加上@EnableAsync
- 配置类中完成异步线程池TaskExecutor的导入
- 需要异步调用的方法加上@Async
异步线程池TaskExecutor
异步线程池接口TaskExecutor继承JDK的Executor,只是在Spring框架内部完成该并发执行接口的重新定义。其实现类与接口层级图如下:
这里最常用的是ThreadPoolTaskExecutor ,其实质是对java.util.concurrent.ThreadPoolExecutor的包装,推荐使用。
配置类中自定义异步线程池:
/** * 自定义异步线程池 * @return */ @Bean public TaskExecutor taskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setThreadNamePrefix("Anno-Executor"); executor.setMaxPoolSize(10); // 设置拒绝策略 executor.setRejectedExecutionHandler(new RejectedExecutionHandler() { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // ..... } }); // 使用预定义的异常处理类 // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); return executor; }
@Async定义异步任务
使用@Async定义异步任务包括如下三种方式:
- 最简单的异步调用,返回值为void
- 带参数的异步调用异步方法
- 异步调用返回Future
代码示例:
@Component public class AsyncDemo { private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class); /** * 最简单的异步调用,返回值为void */ @Async public void asyncInvokeSimplest() { log.info("asyncSimplest"); } /** * 带参数的异步调用 异步方法可以传入参数 * * @param s */ @Async public void asyncInvokeWithParameter(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); } /** * 异步调用返回Future * * @param i * @return */ @Async public Future<String> asyncInvokeReturnFuture(int i) { log.info("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } return future; } }
调用示例:
asyncDemo.asyncInvokeSimplest(); asyncDemo.asyncInvokeWithException("test"); Future<String> future = asyncDemo.asyncInvokeReturnFuture(100); System.out.println(future.get());
对异步方法的异常处理
在调用方法时,可能出现方法中抛出异常的情况。在异步中主要有有两种异常处理方法:
- 对于方法返回值是Futrue的异步方法:
- 在调用future的get时捕获异常
- 在异常方法中直接捕获异常
- 对于返回值是void的异步方法:通过AsyncUncaughtExceptionHandler处理异常
代码如下:
/** * 带参数的异步调用 异步方法可以传入参数 * 对于返回值是void,异常会被AsyncUncaughtExceptionHandler处理掉 * @param s */ @Async public void asyncInvokeWithException(String s) { log.info("asyncInvokeWithParameter, parementer={}", s); throw new IllegalArgumentException(s); } /** * 异常调用返回Future * 对于返回值是Future,不会被AsyncUncaughtExceptionHandler处理,需要我们在方法中捕获异常并处理 * 或者在调用方在调用Futrue.get时捕获异常进行处理 * * @param i * @return */ @Async public Future<String> asyncInvokeReturnFuture(int i) { log.info("asyncInvokeReturnFuture, parementer={}", i); Future<String> future; try { Thread.sleep(1000 * 1); future = new AsyncResult<String>("success:" + i); throw new IllegalArgumentException("a"); } catch (InterruptedException e) { future = new AsyncResult<String>("error"); } catch(IllegalArgumentException e){ future = new AsyncResult<String>("error-IllegalArgumentException"); } return future; }
实现AsyncConfigurer接口对异常线程池更加细粒度的控制:
- 创建线程自己的线程池
- 对void方法抛出的异常处理的类AsyncUncaughtExceptionHandler
代码如下:
@Service public class MyAsyncConfigurer implements AsyncConfigurer{ private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class); @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor(); threadPool.setCorePoolSize(1); threadPool.setMaxPoolSize(1); threadPool.setWaitForTasksToCompleteOnShutdown(true); threadPool.setAwaitTerminationSeconds(60 * 15); threadPool.setThreadNamePrefix("MyAsync-"); threadPool.initialize(); return threadPool; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new MyAsyncExceptionHandler(); } /** * 自定义异常处理类 * @author hry * */ class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... obj) { log.info("Exception message - " + throwable.getMessage()); log.info("Method name - " + method.getName()); for (Object param : obj) { log.info("Parameter value - " + param); } } } }
@Async调用中的事务处理机制
在@Async标注的方法,同时也适用了@Transactional进行了标注;在其调用数据库操作之时,将无法产生事务管理的控制,原因就在于其是基于异步处理的操作。
那该如何给这些操作添加事务管理呢?可以将需要事务管理操作的方法放置到异步方法内部,在内部被调用的方法上添加@Transactional.
例如: 方法A,使用了@Async/@Transactional来标注,但是无法产生事务控制的目的。方法B,使用了@Async来标注, B中调用了C、D,C/D分别使用@Transactional做了标注,则可实现事务控制的目的。
实际业务使用
目前的业务中,主要有两块需使用异步线程池完成快速、高效、简便的异步处理,抛开业务,都是获取队列中请求,并发异步处理,再调用第三方接口返回数据,或者入库用以后续生成报表。部分代码如下:
启动类添加@EnableAsync,完成对于Spring异步调用的支持,不做赘述。
创建异步线程池:
@Configuration public class ThreadPoolConfig { @Bean public TaskExecutor videoRetrievalPool() { ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor(); exec.setCorePoolSize(VideoSearchConstant.threadPoolSize); exec.setMaxPoolSize(VideoSearchConstant.threadPoolSize); exec.setAllowCoreThreadTimeOut(true); exec.setKeepAliveSeconds(100);
// 线程池常用处理策略 , 不做赘述 exec.setThreadNamePrefix("video-"); exec.initialize(); return exec; } }
异步任务耗时较长,为避免频繁刷新队列,暂时可采用定时作业的形式:
@Async("videoRetrievalPool") @Scheduled(fixedRate = 1000) public void scheduleRetrieval() { ... ... }