java - Java ForkJoinPool - 队列中的任务顺序
问题描述
我想了解 Java fork-join pool 中处理任务的顺序。
到目前为止,我在文档中找到的唯一相关信息是关于一个名为“asyncMode”的参数,它是“如果该池对从未加入的分叉任务使用本地先进先出调度模式,则为 true” .
我对这句话的解释是,每个工人都有自己的任务队列;工作人员从自己队列的前面获取任务,或者如果他们自己的队列为空,则窃取其他工作人员队列的后面;如果 asyncMode 为 true(resp.false),worker 将新分叉的任务添加到他们自己队列的后面(resp.front)。
如果我的解释有误,请纠正我!
现在,这提出了几个问题:
1)加入的分叉任务的顺序是什么?
我的猜测是,当一个任务被分叉时,它会被添加到工作人员的队列中,如我上面的解释中所述。现在,假设任务已加入...
如果在调用 join 时任务尚未启动,则调用 join 的工作人员会将任务拉出队列并立即开始处理它。
如果在调用 join 时,该任务已被另一个 worker 窃取,那么调用 join 的 worker 将同时处理其他任务(按照我上面的解释中描述的获取任务的顺序),直到它是的任务偷它的工人已经完成了加入。
这个猜测是基于使用 print 语句编写简单的测试代码,并观察改变连接调用顺序影响任务处理顺序的方式。有人可以告诉我我的猜测是否正确吗?
2) 外部提交的任务的顺序是什么?
根据这个问题的答案,fork-join 池不使用外部队列。(顺便说一下,我使用的是 Java 8。)
那么我是否理解当外部提交任务时,该任务被添加到随机选择的工作队列中?
如果是这样,外部提交的任务是添加到队列的后面还是前面?
最后,这取决于是调用 pool.execute(task) 还是调用 pool.invoke(task) 提交任务?这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是该 fork-join 池中的线程?
解决方案
- 你的猜测是正确的,你是完全正确的。正如您可以在“实施概述”中阅读的那样。
* Joining Tasks * ============= * * Any of several actions may be taken when one worker is waiting * to join a task stolen (or always held) by another. Because we * are multiplexing many tasks on to a pool of workers, we can't * just let them block (as in Thread.join). We also cannot just * reassign the joiner's run-time stack with another and replace * it later, which would be a form of "continuation", that even if * possible is not necessarily a good idea since we may need both * an unblocked task and its continuation to progress. Instead we * combine two tactics: * * Helping: Arranging for the joiner to execute some task that it * would be running if the steal had not occurred. * * Compensating: Unless there are already enough live threads, * method tryCompensate() may create or re-activate a spare * thread to compensate for blocked joiners until they unblock.
2. ForkJoinPool.invoke 和 ForkJoinPool.join 在提交任务的方式上完全相同。你可以在代码中看到
public <T> T invoke(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
return task.join();
}
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
externalPush(task);
}
在 externalPush 中,您可以看到任务被添加到使用 ThreadLocalRandom 随机选择的工作队列中。此外,它使用推送方法进入队列的头部。
final void externalPush(ForkJoinTask<?> task) {
WorkQueue[] ws; WorkQueue q; int m;
int r = ThreadLocalRandom.getProbe();
int rs = runState;
if ((ws = workQueues) != null && (m = (ws.length - 1)) >= 0 &&
(q = ws[m & r & SQMASK]) != null && r != 0 && rs > 0 &&
U.compareAndSwapInt(q, QLOCK, 0, 1)) {
ForkJoinTask<?>[] a; int am, n, s;
if ((a = q.array) != null &&
(am = a.length - 1) > (n = (s = q.top) - q.base)) {
int j = ((am & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);
U.putOrderedInt(q, QTOP, s + 1);
U.putIntVolatile(q, QLOCK, 0);
if (n <= 1)
signalWork(ws, q);
return;
}
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
externalSubmit(task);
}
我不确定你的意思是什么:
这是否取决于调用 pool.execute(task) 或 pool.invoke(task) 的线程是外部线程还是该 fork-join 池中的线程?
推荐阅读
- javascript - 语义 UI 表中的可点击行返回错误
- javascript - 调用通用 HTTP 处理程序
- node.js - 如何在 MongoDB 的更新中使用算术运算符?
- python - 如何将变量与 broken_bath 图一起使用?
- xamarin.forms - 在媒体播放器中使用来自 android 项目的文件
- azerothcore - 如何在 AzerothCore 中通过补丁限制内容
- github - Git-在特定分支上推送代码期间的存储库错误,远程:未找到存储库
- xcode - 如何解决导致项目损坏的 git 合并冲突
- android - 无需计算机的 Android UI 自动化
- linux - 打印数组值上的段错误组装 NASM