首页 > 解决方案 > 为中断分组线程

问题描述

我在线程分组方面遇到了一些问题,这是一个中断(取消一些连续的过程)。我有一个由方法管理的执行入口点,ThreadPoolTaskExecutor它与方法上的@Async注释一起使用。在异步调用并运行此方法后,在其中我有一些其他异步工作要由 some 完成new Thread()。考虑到当我有一些并行运行的任务时,我可能会遇到这样的情况,我需要确定它是什么任务,并interrupt()给定任务及其 在任务执行期间创建的子线程。

我已经调查了一些关于这个问题的事情。我可以设置一些ThreadGroupThreadPoolTaskExecutor得到这个组和interrupt()它。这可能会停止在任务执行期间创建的正在运行的任务和线程,但我需要根据任务 ID动态 的东西,以免停止其他正在运行的任务,因为通过设置所有 Executor 的任务给定组。访问创建的任务和调用有助于停止当前任务,但不是线程ThreadGroupThreadGroupThreadPoolTaskExecutor().setThreadGroup()Future<>future.cancel()

    @Bean
    public TaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(50);
        executor.setThreadNamePrefix("thread-");
        executor.initialize();
        return executor;
    }
    public class Task {
       private Long id;
       //getters and setters
    }
   @Service
    public class AsyncService {

        @Async("taskExecutor")
        public Future<Task> startAsyncProcess(Task task){
            //this thread is not killed by future.cancel()
            new Thread(() -> System.out.println("Some continuous async db write")).start();
            //Some other continuous process, killed by future.cancel()
            return AsyncResult.forValue(task);
        }

    }
    @Service
    public class TaskService {

        @Autowired
        private AsyncService asyncService;

        public void startProcess(Task task){
          Future<Task> future = asyncService.startAsyncProcess(task);
          // future.cancel();
        }

    }

我希望对这些线程进行一些完全中断。也许我需要为此创建一些管理器并用于ThreadFactory生成这些线程并将它们分成小组使用ThreadGroup?提前致谢!

标签: javaspringmultithreading

解决方案


这不是很漂亮,但它有效:实现一个CancelWithRunnableFuture可以提供的类型Runnable...Runnable在方法内部调用cancel()

@Service
class AsyncService {
    @Async("taskExecutor")
    public Future<Task> startAsyncProcess(Task task) {
        Thread threadToInterruptOnCancel = new Thread(() -> System.out.println("Some continuous async db write"));
        threadToInterruptOnCancel.start();

        return new CancelWithRunnableFuture(
                       AsyncResult.forValue(task),
                       () -> threadToKillOnCancel.interrupt());
    }
}

class CancelWithRunnableFuture<V> implements Future<V> {
    private Future<V> delegate;
    private Runnable runOnCancel;

    public CancelWithRunnableFuture(Future<V> delegate, Runnable runOnCancel) {
        this.delegate = delegate;
        this.runOnCancel = runOnCancel;
    }

    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        //
        // The Runnable is run here!
        //
        runOnCancel.run();

        return delegate.cancel(mayInterruptIfRunning);
    }

    @Override
    public boolean isCancelled() {
        return delegate.isCancelled();
    }

    @Override
    public boolean isDone() {
        return delegate.isDone();
    }

    @Override
    public V get() throws InterruptedException, ExecutionException {
        return delegate.get();
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return delegate.get(timeout, unit);
    }

}

推荐阅读