java - 为中断分组线程
问题描述
我在线程分组方面遇到了一些问题,这是一个中断(取消一些连续的过程)。我有一个由方法管理的执行入口点,ThreadPoolTaskExecutor
它与方法上的@Async
注释一起使用。在异步调用并运行此方法后,在其中我有一些其他异步工作要由 some 完成new Thread()
。考虑到当我有一些并行运行的任务时,我可能会遇到这样的情况,我需要确定它是什么任务,并interrupt()
给定任务及其 在任务执行期间创建的子线程。
我已经调查了一些关于这个问题的事情。我可以设置一些ThreadGroup
,ThreadPoolTaskExecutor
得到这个组和interrupt()
它。这可能会停止在任务执行期间创建的正在运行的任务和线程,但我需要根据任务 ID动态 的东西,以免停止其他正在运行的任务,因为通过设置所有 Executor 的任务给定组。访问创建的任务和调用有助于停止当前任务,但不是子线程ThreadGroup
ThreadGroup
ThreadPoolTaskExecutor().setThreadGroup()
Future<>
future.cancel()
@Bean
public TaskExecutor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("thread-");
executor.initialize();
return executor;
}
public class Task {
private Long id;
//getters and setters
}
@Service
public class AsyncService {
@Async("taskExecutor")
public Future<Task> startAsyncProcess(Task task){
//this thread is not killed by future.cancel()
new Thread(() -> System.out.println("Some continuous async db write")).start();
//Some other continuous process, killed by future.cancel()
return AsyncResult.forValue(task);
}
}
@Service
public class TaskService {
@Autowired
private AsyncService asyncService;
public void startProcess(Task task){
Future<Task> future = asyncService.startAsyncProcess(task);
// future.cancel();
}
}
我希望对这些线程进行一些完全中断。也许我需要为此创建一些管理器并用于ThreadFactory
生成这些线程并将它们分成小组使用ThreadGroup
?提前致谢!
解决方案
这不是很漂亮,但它有效:实现一个CancelWithRunnableFuture
可以提供的类型Runnable
...Runnable
在方法内部调用cancel()
。
@Service
class AsyncService {
@Async("taskExecutor")
public Future<Task> startAsyncProcess(Task task) {
Thread threadToInterruptOnCancel = new Thread(() -> System.out.println("Some continuous async db write"));
threadToInterruptOnCancel.start();
return new CancelWithRunnableFuture(
AsyncResult.forValue(task),
() -> threadToKillOnCancel.interrupt());
}
}
class CancelWithRunnableFuture<V> implements Future<V> {
private Future<V> delegate;
private Runnable runOnCancel;
public CancelWithRunnableFuture(Future<V> delegate, Runnable runOnCancel) {
this.delegate = delegate;
this.runOnCancel = runOnCancel;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
//
// The Runnable is run here!
//
runOnCancel.run();
return delegate.cancel(mayInterruptIfRunning);
}
@Override
public boolean isCancelled() {
return delegate.isCancelled();
}
@Override
public boolean isDone() {
return delegate.isDone();
}
@Override
public V get() throws InterruptedException, ExecutionException {
return delegate.get();
}
@Override
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
return delegate.get(timeout, unit);
}
}
推荐阅读
- javascript - 如何使用 acceptWidgets 为“grid-stack-item”设置预定义的高度或宽度
- javascript - 从表单添加动态生成的字段
- php - facebook bot PAGE 中的发送消息按钮
- ultraedit - UltraEdit / Unix 正则表达式。我想用名称替换所有行 + 空格 + 带有名称的数字 + # + 数字 SANTIAGO 80,00
- mysql - 如何在 SQL 中存储包含更多用户的组?
- javascript - 我的 Vuejs 前端和我的 node express 后端如何相互交互,我需要 vue-cli 吗?
- linux - 在 sudo 语句中设置环境变量 - Linux shell
- android - 如何使用“minifyEnabled true”调试应用程序并在发布模式下禁用 MultiDexApplication
- powershell - Powershell循环从导入的CSV读入数组
- android - 如何在 Xamarin.Forms 上使用 F# 在后台运行任务?