java - 固定和动态调整工作线程的数量
问题描述
我目前正在实现一个数据容器/数据结构,它有一个 addRecord(final Redcord record) 和一个 close() 方法。
public class Container{
Container() {
}
public void addRecord(final Record record){
// do something with the record
// add the modified it to the container
}
public void close(){
}
}
由于要将记录存储到容器中需要完成几个步骤,所以我想在最终将它们添加到容器之前将这些步骤外包给线程。
我的问题是我不希望任何容器实例使用超过 4 个线程,并且如果我同时打开多个容器实例,总共不超过,例如,应该使用 12 个线程。除此之外,我希望一旦我创建了第 4 个容器,其他 3 个打开的容器中的每一个都应该丢失它们的一个线程,这样所有 4 个容器每个都可以使用 3 个线程。
我正在研究 ThreadPools 和 ExecutorServices。虽然设置大小为 12 的线程池很简单,但在使用 ExecutorServices 时很难将使用的线程数限制为 4。回想一下,我最多需要 12 个线程,这就是为什么我在 Container 实例之间共享一个大小为 12 的静态实例。
这就是我开始的
public class Container{
public static final ExecutorService SERVICE= Executors.newFixedThreadPool(12);
final List<Future<Entry>> _fRecords = new LinkedList<>();
Container() {
}
public void addRecord(final Record record){
_fRecords.add(SERVICE.submit(new Callable<Entry>{
@Override
public Entry call() throws Exception {
return record.createEntry();
}
}));
}
public void close() throws Exception {
for(final Future<Entry> f) {
f.get(); // and add the entry to the container
}
}
}
显然,这不能确保单个实例最多使用 4 个线程。此外,如果另一个容器需要几个线程,我看不出如何强制这些 Executorservices 关闭工作程序或重新分配它们。
问题:
由于 Redord 本身是一个接口,而 createEntry() 的运行时间取决于实际的实现,我想确保不同的容器不使用相同的线程/worker,即我不希望任何线程完成工作对于两个不同的容器。这背后的想法是,我不希望只处理“长期运行”记录的容器实例被只处理“短期运行记录”的容器减慢。如果这在概念上没有意义(向您提出问题),则无需为不同的容器设置隔离的线程/工作者。
实施我自己的 ExecutorService 是正确的方法吗?有人可以指出一个有类似问题/解决方案的项目吗?否则我想我必须实现自己的 ExecutorService 并在我的容器中共享它,还是有更好的解决方案?
目前,我正在使用 close 方法收集所有期货,因为容器必须按照它们到达的顺序存储记录/条目。显然,这需要很多时间,因此我想在 Future 完成后执行此操作。让额外的工作人员单独处理期货是否有意义,或者让我的工作人员完成这项工作是否更好,即进行线程交互。
解决方案
我不认为 Java 运行时提供了开箱即用的东西来满足您的需求。
我编写了自己的课程来满足此类需求(公共池+给定队列的限制)。
public static class ThreadLimitedQueue {
private final ExecutorService executorService;
private final int limit;
private final Object lock = new Object();
private final LinkedList<Runnable> pending = new LinkedList<>();
private int inProgress = 0;
public ThreadLimitedQueue(final ExecutorService executorService, final int limit) {
this.executorService = executorService;
this.limit = limit;
}
public void submit(Runnable runnable) {
final Runnable wrapped = () -> {
try {
runnable.run();
} finally {
onComplete();
}
};
synchronized (lock) {
if (inProgress < limit) {
inProgress++;
executorService.submit(wrapped);
} else {
pending.add(wrapped);
}
}
}
private void onComplete() {
synchronized (lock) {
final Runnable pending = this.pending.poll();
if (pending == null || inProgress > limit) {
inProgress--;
} else {
executorService.submit(pending);
}
}
}
}
在我的情况下,唯一的区别limit
是不变的,但你可以修改它。例如,您可以替换int limit
为,Supplier<Integer> limitFunction
并且此功能必须提供动态限制,例如
Supplier<Integer> limitFunction = 12 / containersList.size();
让它更健壮(例如,如果containersList为空或超过12怎么办)
推荐阅读
- postman - 如何通过数据文件跳过纽曼中的第一个/当前请求?
- css - 如何在 react-native 中将 View 组件左右对齐
- php - foreach 的序列化数据列表
- react-native - React Native 中的 PDF 下载器
- python - 如何在 python 烧瓶中对两个或多个 CSV 文件进行验证
- neural-network - 为什么 1st epoch 在训练神经网络模型时需要大量时间?
- angular - 组件中的变量仅在 Angular 4 服务中的第二个函数调用后更新
- java - 如何在 Android 中第一次使用 EventBus
- amazon-web-services - npm run build 不会在 elasticbeanstalk 上创建构建目录
- mysql - 如何用mysql查询替换日期