首页 > 解决方案 > 如何在 Spring Boot AsyncConfigurer 中使用优先级队列

问题描述

我有一个应用程序,其中有多个线程从 jms 目标读取消息。侦听器线程读取消息,对其进行一些更改并调用不同类的其他几个方法。这些方法带有@Async注释,所有方法都使用自定义并行执行ThreadPoolTaskExecutor

@Override
public Executor getAsyncExecutor() {        
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(corePoolSize);
    executor.setMaxPoolSize(maxPoolSize);
    executor.setQueueCapacity(queueCapacity);
    executor.setKeepAliveSeconds(keepAliveSeconds);
    executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    executor.setTaskDecorator(new LoggingTaskDecorator());
    executor.initialize();
    return executor;
}

到目前为止,所有消息都被认为具有相同的优先级,一切都很好,因为LinkedBlockingQueue如果没有Executor线程可用,所有消息都会进入。

现在,需要从队列中读取的特定类型的消息被赋予比从队列中读取的任何其他消息更高的优先级。

目前,我正在使用“org.springframework.scheduling.concurrent.ThreadPoolTask​​Executor”,它没有提供任何可以将优先队列设置为阻塞队列实现的方法。

你能帮我解决这个问题吗?还是系统的现有设计无法适应这种变化?或者处理这种情况的最佳解决方案是什么?

谢谢 !

标签: javaspringspring-bootasynchronousthreadpoolexecutor

解决方案


通过简单地覆盖该createQueue方法。此外,您应该使用一种@Bean方法来创建 bean 的实例,这样 Spring 可以正确管理生命周期,这是一件小而重要的事情(否则关闭将无法正常工作)..

@Override
public Executor getAsyncExecutor() {
  return taskExecutor();
}        

@Bean    
public ThreadPoolTaskExecutor taskExecutor() {        
  ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor() {
    protected BlockingQueue<Runnable> createQueue(int queueCapacity) {
      return new PriorityBlockingQueue<>(queueCapacity);
    } 
  };
  executor.setCorePoolSize(corePoolSize);
  executor.setMaxPoolSize(maxPoolSize);
  executor.setQueueCapacity(queueCapacity);
  executor.setKeepAliveSeconds(keepAliveSeconds);
  executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
  executor.setTaskDecorator(new LoggingTaskDecorator());  
  return executor;
}

像这样的东西应该工作。该createQueue方法现在创建 aPriorityBlockingQueue而不是默认的LinkedBlockingQueue.


推荐阅读