java - 当 ForkJoinPool 结束时
问题描述
我试图弄清楚所有 ForkJoinPool 线程何时完成它们的任务。我编写了这个测试应用程序(我使用了 System.out,因为它只是一个快速测试应用程序并且没有错误检查/处理):
public class TestForkJoinPoolEnd {
private static final Queue<String> queue = new LinkedList<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
enqueue("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!customThreadPool.isTerminating()) {
String s = dequeue();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {
}
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
}
static List<String> makeList() {
return Stream.generate(() -> makeString())
.limit(MAX_SIZE)
.collect(Collectors.toList());
}
static String makeString() {
int leftLimit = 97; // letter 'a'
int rightLimit = 122; // letter 'z'
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = leftLimit + (int)
(random.nextFloat() * (rightLimit - leftLimit + 1));
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
int sum = 0;
for (int i = 0; i < s.length(); i++) {
sum += s.charAt(i);
}
return (sum / SPEED_UP);
}
static void process(String s) {
StringBuilder sb = new StringBuilder(s);
long start = System.currentTimeMillis();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.currentTimeMillis();
sb.append(" slept for ")
.append((end - start))
.append(" milliseconds");
enqueue(sb.toString());
}
static void enqueue(String s) {
synchronized (queue) {
queue.offer(s);
}
}
static String dequeue() {
synchronized (queue) {
return queue.poll();
}
}
}
这段代码卡住了,永远不会完成。如果我将 while 循环的条件更改为!customThreadPool.isQuiescent()
它会终止循环,并将计数器和队列大小设置为 1。
我应该使用什么来确定线程何时完成?
解决方案
AnExecutorService
不会仅仅因为一项作业(及其子作业)完成而自行终止。线程池背后的整个想法是可重用的。
所以它只会在应用程序调用shutdown()
它时终止。
您可以使用isQuiescent()
来确定是否没有待处理的作业,这仅在所有提交的作业都属于您的特定任务时才有效。使用返回的未来submit
来检查实际工作的完成情况要干净得多。
在任何一种情况下,排队任务的完成状态都不会说明您正在轮询的队列。当您了解提交结束时,您仍然需要检查队列中的待处理元素。
此外,建议使用线程安全的实现,而不是用块BlockingQueue
来装饰 a 。连同其他一些需要清理的东西,代码如下所示:LinkedList
synchronized
public class TestForkJoinPoolEnd {
private static final BlockingQueue<String> QUEUE = new LinkedBlockingQueue<>();
private static final int MAX_SIZE = 5000;
private static final int SPEED_UP = 100;
public static void main(String[] args) {
ForkJoinPool customThreadPool = new ForkJoinPool(12);
ForkJoinTask<?> future = customThreadPool.submit(
() -> makeList()
.parallelStream()
.forEach(TestForkJoinPoolEnd::process));
QUEUE.offer("Theard pool started up");
int counter = MAX_SIZE + 1;
while (!future.isDone()) try {
String s = QUEUE.poll(1, TimeUnit.MILLISECONDS);
if (s != null) {
System.out.println(s);
counter--;
}
} catch (InterruptedException e) {}
for(;;) {
String s = QUEUE.poll();
if (s == null) break;
System.out.println(s);
counter--;
}
System.out.println("counter = " + counter);
System.out.println("isQuiescent = " + customThreadPool.isQuiescent() + " isTerminating " +
"= " + customThreadPool.isTerminating() + " isTerminated = "
+ customThreadPool.isTerminated() + " isShutdown =" + customThreadPool.isShutdown());
customThreadPool.shutdown();
}
static List<String> makeList() {
return IntStream.range(0, MAX_SIZE)
.mapToObj(i -> makeString())
.collect(Collectors.toList());
}
static String makeString() {
int targetStringLength = 10;
Random random = new Random();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('z' - 'a' + 1) + 'a';
buffer.append((char) randomLimitedInt);
}
return buffer.toString();
}
static int toSeed(String s) {
return s.chars().sum() / SPEED_UP;
}
static void process(String s) {
long start = System.nanoTime();
try {
TimeUnit.MILLISECONDS.sleep(toSeed(s));
} catch (InterruptedException e) {
}
long end = System.nanoTime();
QUEUE.offer(s + " slept for " + (end - start)/1000000 + " milliseconds");
}
}
如果您sleep
在接收端的呼叫应该模拟一些工作量而不是等待新项目,您也可以使用
int counter = MAX_SIZE + 1;
while (!future.isDone()) {
String s = QUEUE.poll();
if (s != null) {
System.out.println(s);
counter--;
}
try {
TimeUnit.MILLISECONDS.sleep(1);
} catch (InterruptedException e) {}
}
但逻辑没有改变。在future.isDone()
返回后true
,我们必须重新检查队列中的待处理元素。我们只保证不会有新项目到达,而不是保证队列已经是空的。
作为旁注,该makeString()
方法可以进一步改进为
static String makeString() {
int targetStringLength = 10;
ThreadLocalRandom random = ThreadLocalRandom.current();
StringBuilder buffer = new StringBuilder(targetStringLength);
for (int i = 0; i < targetStringLength; i++) {
int randomLimitedInt = random.nextInt('a', 'z' + 1);
buffer.append((char)randomLimitedInt);
}
return buffer.toString();
}
甚至
static String makeString() {
int targetStringLength = 10;
return ThreadLocalRandom.current()
.ints(targetStringLength, 'a', 'z'+1)
.collect(StringBuilder::new, StringBuilder::appendCodePoint, StringBuilder::append)
.toString();
}
推荐阅读
- c++ - USACO-beads,无法调试 while 循环
- json - ReactJS - 对来自 DBJSON 的数据进行分页
- html - 填充条形图右侧的百分比
- ios - 如何在后台队列中获取对象的领域结果,然后在主线程上使用它
- javascript - Vue cli 3 与 apollo 冲突
- c# - 条件运算符。错误 CS0266 无法将类型“int”隐式转换为“byte”
- python - 我可以在 django pre_save 信号中执行查询吗?
- python - 视频未保存在opencv python中
- javascript - 我收到一个对象作为响应,我使用 response.json() 将其转换为 JSON,但随后无法从 json 中获取值?
- javascript - 动态填充 Angularjs 数据表