java - 处理 FixedThreadPool 中的背压
问题描述
如何使用线程池处理 Java 中的背压?
如何拒绝新任务,使提交的任务不超过N个。N - 是提交队列中允许的最大任务数,包括新的、正在运行的、暂停(未完成)的任务。
用例
用户提交运行一段时间的计算任务。有时,有很多用户同时提交任务。如果已经提交了N个任务,如何拒绝新任务。
换句话说,提交(未完成、已启动或未启动)任务的总数不能大于N。
示例代码
这是完整版,下面是简短的片段。
一项长期运行的任务。计算任务。
public class CalculationTask {
public CalculationTask(final String name) {
this.name = name;
}
public CalculationResult calculate() {
final long waitTimeMs = MIN_WAIT_TIME_MS + RANDOM.nextInt(MAX_WAIT_TIME_MS);
sleep(waitTimeMs);
final int result = Math.abs(RANDOM.nextInt());
final String text = "This is result: " + result;
final CalculationResult calculationResult = new CalculationResult(name, text, result);
System.out.println("Calculation finished: " + calculationResult);
return calculationResult;
}
}
它的结果。计算结果。
public class CalculationResult {
private final String taskName;
private final String text;
private final Integer number;
// Getters, setters, constructor, toString.
}
这就是我提交工作的方式。计算经纪人。
public class CalculationBroker {
private static final int MAX_WORKERS_NUMBER = 5;
private final ExecutorService executorService = Executors.newFixedThreadPool(MAX_WORKERS_NUMBER);
private final Map<String, CalculationResult> calculationCache = new ConcurrentHashMap<>();
public CompletableFuture<CalculationResult> submit(final CalculationTask calculationTask) {
final CalculationResult calculationResultCached = calculationCache.get(calculationTask.getName());
if (calculationResultCached != null) {
return CompletableFuture.completedFuture(calculationResultCached);
}
System.out.println("Calculation submitted: " + calculationTask.getName());
final CompletableFuture<CalculationResult> calculated = CompletableFuture
.supplyAsync(calculationTask::calculate, executorService);
calculated.thenAccept(this::updateCache);
return calculated;
}
private void updateCache(final CalculationResult calculationResult) {
calculationCache.put(calculationResult.getTaskName(), calculationResult);
}
}
这就是我将它们一起运行的方式。主要。
public class Main {
public static void main(String[] args) {
final int N_TASKS = 100;
final CalculationBroker calculationBroker = new CalculationBroker();
final List<CompletableFuture<CalculationResult>> completableFutures = new ArrayList<>();
for (int i = 0; i < N_TASKS; i++) {
final CalculationTask calculationTask = createCalculationTask(i);
final CompletableFuture<CalculationResult> calculationResultCompletableFuture =
calculationBroker.submit(calculationTask);
completableFutures.add(calculationResultCompletableFuture);
}
calculationBroker.close();
}
private static CalculationTask createCalculationTask(final int counter) {
return new CalculationTask("CalculationTask_" + counter);
}
}
这是输出。
2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_97.
2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_98.
2020-05-23 14:14:53 [main] INFO c.y.t.backperssure.CalculationBroker – Calculation submitted: CalculationTask_99.
2020-05-23 14:14:54 [pool-1-thread-3] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_2', text='This is result: 1081871544', number=1081871544, durationMs=1066}
2020-05-23 14:14:55 [pool-1-thread-1] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_0', text='This is result: 1942553785', number=1942553785, durationMs=1885}
2020-05-23 14:14:56 [pool-1-thread-5] INFO c.y.t.backperssure.CalculationTask – Calculation finished: CalculationResult{taskName='CalculationTask_4', text='This is result: 104326011', number=104326011, durationMs=2120}
20
我的发现。
上面的代码等价于 Executors.newFixedThreadPool(n),但是我们使用固定容量为 100 的 ArrayBlockingQueue 而不是默认的无限 LinkedBlockingQueue。这意味着如果 100 个任务已经排队(并且 n 正在执行),新任务将被拒绝并出现 RejectedExecutionException .
ThreadPoolExecutor
使用 a LinkedBlockingQueue
,默认情况下是无限制的。
正如上面的帖子所暗示的:
final BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(100);
executorService = new ThreadPoolExecutor(n, n, 0L, TimeUnit.MILLISECONDS, queue);
解决方案
您回答了自己的问题...您可以使用Queue
size 来做到这一点..
int poolSize = ...;
int queueSize = ...;
CustomRejectedExecutionHandler handler = new CustomRejectedExecutionHandler();
ExecutorService executorService = new ThreadPoolExecutor(poolSize, poolSize,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize),
handler);
您可以CustomRejectedExecutionHandler
用来处理被拒绝的线程。
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.log4j.Logger;
public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
public static final Logger LOGGER = Logger.getLogger(CustomRejectedExecutionHandler.class);
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
LOGGER.error(runnable.toString() + " execution rejected.");
}
}
推荐阅读
- chart.js - Angular 9 上的 ng2 图表没有响应
- java - 第一次保存时出现事务错误,分离的实体传递给持久化
- javascript - 为什么 React 调用我的组件的次数比需要的多?
- java - 获取 RouterFunctionMapping 的处理程序方法名称
- nginx-config - NGINX 代理返回错误的端口
- javascript - 在 React Native 中单击时排序和排序列表
- snowflake-cloud-data-platform - 雪花特权
- javascript - 使用 JavaScript reduce 更新并添加到对象数组
- sql-server - 尝试更新 SQL Server 中的列值会导致错误?
- c++ - 在类中声明函数数组