首页 > 技术文章 > Java多线程中协作机制

haiyangwu 2019-02-25 17:27 原文

一、生产者、消费者协作机制:

  生产者线程和消费者线程通过共享队列进行协作,生产者将数据或任务放到队列上,而消费者从列队上取数据或任务,如果队列长度有限,在队列满的时候,生产者等待,而在队列为空的时候,消费者等待。

/**
 * 使用两个栈 实现队列
 *
 * 生产者,消费者协作模式:
   共享变量是一个阻塞队列,当队列满了生产者wait(),当队列为空消费者wait(); * * 阻塞队列有: * 接口:BlockingQueue、BlockingDeque 双端队列 * 基数数组的实现类:ArrayBlockingQueue * 基于链表的实现类:LinkedBlockingQueue、LinkedBlockingDeque * 基于堆的实现类:PriorityBlockingQueue *
*/ public class QueueTest<E> { private ArrayDeque<E> producerStack = null; private ArrayDeque<E> consumerStack = null; private static int num = 0; public static void main(String[] args) throws InterruptedException { QueueTest<String> eQueueTest = new QueueTest<>(); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.producer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); // Thread.sleep( (int)(Math.random() * 100)); new Thread(new Runnable() { @Override public void run() { try { eQueueTest.consumer(); Thread.sleep( (int)(Math.random() * 100)); } catch (InterruptedException e) { e.printStackTrace(); } } }).start(); } public QueueTest() { producerStack = new ArrayDeque<>(10); consumerStack = new ArrayDeque<>(10); } public synchronized void producer() throws InterruptedException { while (true){ while(producerStack.size() == 10){ this.wait(); } producerStack.addLast((E) String.valueOf(++num));
System.out.println(
"producerStack size :" + producerStack.size() ); for (E e : producerStack) { System.out.println("producerStack..." + e.toString()); } this.notifyAll(); } } public synchronized void consumer() throws InterruptedException { while(true){ while(producerStack.isEmpty()){ this.wait(); }
consumerStack.addLast(producerStack.pollLast());
System.out.println(
"consumerStack size :" + consumerStack.size() ); for (E c : consumerStack) { System.out.println("consumerStack..." + c.toString()); } System.out.println("------------------------------------------------------------"); if(consumerStack.size() == 10){ while(consumerStack.size() > 0){ System.out.println("consumerStack,pollLast :" + consumerStack.pollLast().toString()); } } this.notifyAll(); } } }

 

二、同步协作机制:

  在一些程序,尤其是模拟仿真程序中,要求多个线程同时开始。

/**
 * 同步协作机制
 *
 */
public class SynchronizationTest {

    private volatile boolean fired = false;

    public static void main(String[] args){

        SynchronizationTest synchronizationTest = new SynchronizationTest();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //等待住线程唤醒
                    synchronizationTest.await();

                    System.out.println("thread 1: " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    //等待住线程唤醒
                    synchronizationTest.await();

                    System.out.println("thread 2: " + Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // 唤醒所有线程
        synchronizationTest.fire();
    }

    public synchronized void await() throws InterruptedException {
        while (!fired){
            wait();
        }
    }

    public synchronized void fire(){
        this.fired = true;
        notifyAll();
    }

}

 

三、主从协作机制(等待结束、异步结果):

  主线程将任务分解为若干个子任务,为每个子任务创建一个线程,主线程在继续执行其他任务之前需要等待每个子任务执行完毕。 

/**
 *  等待所有子线程执行完毕,主线程再执行
 *
 *  协作对象
 *
 *  同步类:
 *      CountDownLatch
 *
 */
public class MyLatch {

    public static void main(String[] args) throws InterruptedException {

        int countNum = 10;
        CountDownLatch countDownLatch = new CountDownLatch(countNum);

        for (int j = 0; j < countNum; j++){
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("countDownLatch : " + Thread.currentThread().getName());

                        Thread.sleep((int)(Math.random() * 100));

                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            }, "thread" + j).start();
        }
        countDownLatch.await();
        System.out.println("collect worker result");

    }

}
/**
 *  主从模式,一种常见的模式是异步调用,异步调用返回一个Future对象,通过它获得最终的结果;
 *
 *  1.表示异步结果的接口 Future、FutureTask
 *
 *  2.用于执行异步任务的接口Executor,以及子接口ExecutorService
 *
 *  3.用于创建Executor和ExecutorService的工厂方法类Executors
 *
 *  4.Callable 创建子任务的接口
 *
 */
public class AsynchronizationTest {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newCachedThreadPool();
List
<MyFuture> myFutureList = new ArrayList<>(); for (int i = 1; i <= 10 ; i++) { myFutureList.add(new MyFuture("callable " + i)); } // 实现 callable 的子类实例 List<Future<String>> futures = executor.invokeAll(myFutureList); System.out.println("futures size : " + futures.size()); futures.forEach(p -> { try { if(!p.isDone()){ System.out.println("future is not done "); } System.out.println("futures" + p.get()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }); executor.shutdown(); } /** * 实现 Callable 接口 */ static class MyFuture implements Callable<String> { private String name;
public MyFuture(String name){ this.name = name; } @Override public String call() throws Exception { return "callable " + name; } } }

 

四、集合点协作机制:

  给所有线程到指定一个集合点,当都到达时才继续执行下面的程序。

/**
 *  多线程集合点协调模式
 */
public class CyclicBarrierTest {

    public static void main(String[] args){
        int num = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(num, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有子线程已集合,继续执行任务..."); //所有线程集合后执行
            }
        });


        for (int i = 1; i <= num; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((int)(Math.random() * 100));

                        System.out.println(Thread.currentThread().getName() + " 等待集合...");

                        cyclicBarrier.await();

                        System.out.println(Thread.currentThread().getName() + " 已集合,继续执行...");

                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }
            }, "cyclicBarrier " + i).start();
        }
    }
}

推荐阅读