首页 > 解决方案 > 单生产者多消费者 Java

问题描述

我是 Java 并发的新手,并试图实现/实现单生产者 [P1] 和多消费者 [C1,C2,C3]。

这个想法是生产者 [P1] 输入值,消费者 C1、C2、C3 都运行他们的任务来单独读取 P1 输入的值。一旦 C1,C2,C3 读取值,P1 再次放入新数据。然后 C1,C2,C3 读取数据并继续此循环。

等待通知适用于单生产者单消费者,但在这种情况下,单生产者多消费者等待通知概念看起来不是一个好的策略。我应该如何处理这个问题。

标签: javamultithreadingconcurrencywaitnotify

解决方案


感谢@Ivan 和@Andreas。

@Ivan - 在他的评论中让我了解了生产者消费者模式的行为方式。@Andreas - 在他的评论中建议使用 Phaser。(我使用 Cyclic Barrier 代替,因为我的注册线程数不会动态变化)

他们的评论都共享以下示例代码。如果有任何或更好的方法来处理这个问题,请建议即兴创作。

主班

    public static void main(String[] args)
    {
        SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
        new Thread(new Producer(sharedSpace)).start();


        Consumer consumerRunnable = new Consumer(sharedSpace);
        new Thread(consumerRunnable).start();

        CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);

        new Thread(new EndUser(barrier,consumerRunnable)).start();
        new Thread(new EndUser(barrier,consumerRunnable)).start();
        new Thread(new EndUser(barrier,consumerRunnable)).start();
    }

制片人

private SharedSpace sharedSpace;

public Producer(SharedSpace sharedSpace) {
    super();
    this.sharedSpace = sharedSpace;
}

public SharedSpace getSharedSpace() {
    return sharedSpace;
}

public void setSharedSpace(SharedSpace sharedSpace) {
    this.sharedSpace = sharedSpace;
}

@Override
public void run() {

    for(int i=0;i<3;i++)
    {
        int value = (int) (Math.random()*30);
        sharedSpace.addValue(value);
    }


}

生产者和消费者共享队列

private BlockingQueue<Integer> queue;

    public SharedSpace(BlockingQueue<Integer> queue) {
        super();
        this.queue = queue;
    }

    public BlockingQueue<Integer> getQueue() {
        return queue;
    }

    public void setQueue(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    public void addValue(int value)
    {
        try {
            queue.put(value);
            System.out.println(System.nanoTime()+" Producer added value "+value);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public int getValue() throws InterruptedException
    {
            return queue.take();


    }

消费者

private SharedSpace sharedSpace;

    private Integer value;

    public Consumer(SharedSpace sharedSpace) {
        super();
        this.sharedSpace = sharedSpace;
    }

    public SharedSpace getSharedSpace() {
        return sharedSpace;
    }

    public void setSharedSpace(SharedSpace sharedSpace) {
        this.sharedSpace = sharedSpace;
    }

    public Integer getValue() {
        return value;
    }

    public void setValue(Integer value) {
        this.value = value;
    }

    @Override
    public void run() 
    {

        try {
            setValue(sharedSpace.getValue());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

最终用户

CyclicBarrier barrier;

Consumer consumer;

public EndUser(CyclicBarrier barrier) {
    super();
    this.barrier = barrier;
}

public EndUser(CyclicBarrier barrier, Consumer consumer) {
    super();
    this.barrier = barrier;
    this.consumer = consumer;
}


public Consumer getConsumer() {
    return consumer;
}

public void setConsumer(Consumer consumer) {
    this.consumer = consumer;
}


public CyclicBarrier getBarrier() {
    return barrier;
}


public void setBarrier(CyclicBarrier barrier) {
    this.barrier = barrier;
}


@Override
public void run() {
    try
    {
        while(true)
        {
            System.out.println(consumer.getValue());
            barrier.await();
        }
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }

}

输出 [除非所有最终用户都获取了他们的数据,否则消费者不会从生产者那里读取数据]

Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0

推荐阅读