java - Java 并发实践“清单 12.5. BoundedBuffer 的生产者-消费者测试程序。” 循环障碍等待理解?
问题描述
我正在阅读 Java Concurrency in Practice 并遇到以下代码片段。清单 12.5 https://jcip.net/listings/PutTakeTest.java
// Listing 12.5. Producer-consumer test program for BoundedBuffer.
package net.jcip.examples;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import junit.framework.TestCase;
/**
* PutTakeTest
* <p/>
* Producer-consumer test program for BoundedBuffer
*
* @author Brian Goetz and Tim Peierls
*/
public class PutTakeTest extends TestCase {
protected static final ExecutorService pool = Executors.newCachedThreadPool();
protected CyclicBarrier barrier;
protected final SemaphoreBoundedBuffer<Integer> bb;
protected final int nTrials, nPairs;
protected final AtomicInteger putSum = new AtomicInteger(0);
protected final AtomicInteger takeSum = new AtomicInteger(0);
public static void main(String[] args) throws Exception {
new PutTakeTest(10, 10, 100000).test(); // sample parameters
pool.shutdown();
}
public PutTakeTest(int capacity, int npairs, int ntrials) {
this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
this.nTrials = ntrials;
this.nPairs = npairs;
this.barrier = new CyclicBarrier(npairs * 2 + 1);
}
void test() {
try {
for (int i = 0; i < nPairs; i++) {
pool.execute(new Producer());
pool.execute(new Consumer());
}
barrier.await(); // wait for all threads to be ready
barrier.await(); // wait for all threads to finish
assertEquals(putSum.get(), takeSum.get());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
static int xorShift(int y) {
y ^= (y << 6);
y ^= (y >>> 21);
y ^= (y << 7);
return y;
}
class Producer implements Runnable {
public void run() {
try {
int seed = (this.hashCode() ^ (int) System.nanoTime());
int sum = 0;
barrier.await();
for (int i = nTrials; i > 0; --i) {
bb.put(seed);
sum += seed;
seed = xorShift(seed);
}
putSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
class Consumer implements Runnable {
public void run() {
try {
barrier.await();
int sum = 0;
for (int i = nTrials; i > 0; --i) {
sum += bb.take();
}
takeSum.getAndAdd(sum);
barrier.await();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
我发现很难理解如何在主线程或可运行线程中第二次调用循环屏障。根据我的理解,循环屏障将阻塞线程,直到在所有线程上调用 await 并且屏障计数与构造函数中传递的值匹配。当第一次在线程上等待屏障时,循环屏障中的等待计数将是 (npairs * 2 + 1) 所需值的一半。控件如何在生产者和消费者中执行 put sum 和 take sum 计算,并在主线程上连续执行?
如果这个问题听起来很幼稚,请提前道歉。
解决方案
主线程在调用屏障之前启动npairs
生产者和消费者。每一个如果生产者和消费者调用so 与一个主线程一起,它允许所有线程通过屏障。npairs
await
await
推荐阅读
- java - MemoryLeakMonitor.jar 不存在!(华为手机)
- django - Django - 可以直接在设置文件中从云服务加载秘密/密码/动态值吗
- python - (discord.py) 如何使我的 setprefix 命令正常工作?
- python - 在默认目录或每个单独项目的目录中创建 Conda 环境是更好的做法吗?
- python - 我无法在 Mac 中从 Python3 下载任何模块
- android-studio - 无法将任何虚拟设备连接到android studio
- c# - 声明的委托上的编译错误“无法转换”
- apache-spark - 将广播变量从 PySpark 传递到 Java 函数时没有属性错误
- swift - 给 UISlider 进度条圆角 [Swift]
- discord.js - DiscordJS 参数切片