首页 > 解决方案 > Java 并发实践“清单 12.5. BoundedBuffer 的生产者-消费者测试程序。” 循环障碍等待理解?

问题描述

我正在阅读 Java Concurrency in Practice 并遇到以下代码片段。清单 12.5 https://jcip.net/listings/PutTakeTest.java

// Listing 12.5. Producer-consumer test program for BoundedBuffer.
package net.jcip.examples;

import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

import junit.framework.TestCase;

/**
 * PutTakeTest
 * <p/>
 * Producer-consumer test program for BoundedBuffer
 *
 * @author Brian Goetz and Tim Peierls
 */
public class PutTakeTest extends TestCase {
    protected static final ExecutorService pool = Executors.newCachedThreadPool();
    protected CyclicBarrier barrier;
    protected final SemaphoreBoundedBuffer<Integer> bb;
    protected final int nTrials, nPairs;
    protected final AtomicInteger putSum = new AtomicInteger(0);
    protected final AtomicInteger takeSum = new AtomicInteger(0);

    public static void main(String[] args) throws Exception {
        new PutTakeTest(10, 10, 100000).test(); // sample parameters
        pool.shutdown();
    }

    public PutTakeTest(int capacity, int npairs, int ntrials) {
        this.bb = new SemaphoreBoundedBuffer<Integer>(capacity);
        this.nTrials = ntrials;
        this.nPairs = npairs;
        this.barrier = new CyclicBarrier(npairs * 2 + 1);
    }

    void test() {
        try {
            for (int i = 0; i < nPairs; i++) {
                pool.execute(new Producer());
                pool.execute(new Consumer());
            }
            barrier.await(); // wait for all threads to be ready
            barrier.await(); // wait for all threads to finish
            assertEquals(putSum.get(), takeSum.get());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    static int xorShift(int y) {
        y ^= (y << 6);
        y ^= (y >>> 21);
        y ^= (y << 7);
        return y;
    }

    class Producer implements Runnable {
        public void run() {
            try {
                int seed = (this.hashCode() ^ (int) System.nanoTime());
                int sum = 0;
                barrier.await();
                for (int i = nTrials; i > 0; --i) {
                    bb.put(seed);
                    sum += seed;
                    seed = xorShift(seed);
                }
                putSum.getAndAdd(sum);
                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    class Consumer implements Runnable {
        public void run() {
            try {
                barrier.await();
                int sum = 0;
                for (int i = nTrials; i > 0; --i) {
                    sum += bb.take();
                }
                takeSum.getAndAdd(sum);
                barrier.await();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
}

我发现很难理解如何在主线程或可运行线程中第二次调用循环屏障。根据我的理解,循环屏障将阻塞线程,直到在所有线程上调用 await 并且屏障计数与构造函数中传递的值匹配。当第一次在线程上等待屏障时,循环屏障中的等待计数将是 (npairs * 2 + 1) 所需值的一半。控件如何在生产者和消费者中执行 put sum 和 take sum 计算,并在主线程上连续执行?

如果这个问题听起来很幼稚,请提前道歉。

标签: javamultithreadingconcurrency

解决方案


主线程在调用屏障之前启动npairs生产者和消费者。每一个如果生产者和消费者调用so 与一个主线程一起,它允许所有线程通过屏障。npairsawaitawait


推荐阅读