java - 单生产者多消费者 Java
问题描述
我是 Java 并发的新手,并试图实现/实现单生产者 [P1] 和多消费者 [C1,C2,C3]。
这个想法是生产者 [P1] 输入值,消费者 C1、C2、C3 都运行他们的任务来单独读取 P1 输入的值。一旦 C1,C2,C3 读取值,P1 再次放入新数据。然后 C1,C2,C3 读取数据并继续此循环。
等待通知适用于单生产者单消费者,但在这种情况下,单生产者多消费者等待通知概念看起来不是一个好的策略。我应该如何处理这个问题。
解决方案
感谢@Ivan 和@Andreas。
@Ivan - 在他的评论中让我了解了生产者消费者模式的行为方式。@Andreas - 在他的评论中建议使用 Phaser。(我使用 Cyclic Barrier 代替,因为我的注册线程数不会动态变化)
他们的评论都共享以下示例代码。如果有任何或更好的方法来处理这个问题,请建议即兴创作。
主班
public static void main(String[] args)
{
SharedSpace sharedSpace = new SharedSpace(new LinkedBlockingQueue<Integer>(1));
new Thread(new Producer(sharedSpace)).start();
Consumer consumerRunnable = new Consumer(sharedSpace);
new Thread(consumerRunnable).start();
CyclicBarrier barrier = new CyclicBarrier(3,consumerRunnable);
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
new Thread(new EndUser(barrier,consumerRunnable)).start();
}
制片人
private SharedSpace sharedSpace;
public Producer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
@Override
public void run() {
for(int i=0;i<3;i++)
{
int value = (int) (Math.random()*30);
sharedSpace.addValue(value);
}
}
生产者和消费者共享队列
private BlockingQueue<Integer> queue;
public SharedSpace(BlockingQueue<Integer> queue) {
super();
this.queue = queue;
}
public BlockingQueue<Integer> getQueue() {
return queue;
}
public void setQueue(BlockingQueue<Integer> queue) {
this.queue = queue;
}
public void addValue(int value)
{
try {
queue.put(value);
System.out.println(System.nanoTime()+" Producer added value "+value);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public int getValue() throws InterruptedException
{
return queue.take();
}
消费者
private SharedSpace sharedSpace;
private Integer value;
public Consumer(SharedSpace sharedSpace) {
super();
this.sharedSpace = sharedSpace;
}
public SharedSpace getSharedSpace() {
return sharedSpace;
}
public void setSharedSpace(SharedSpace sharedSpace) {
this.sharedSpace = sharedSpace;
}
public Integer getValue() {
return value;
}
public void setValue(Integer value) {
this.value = value;
}
@Override
public void run()
{
try {
setValue(sharedSpace.getValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
最终用户
CyclicBarrier barrier;
Consumer consumer;
public EndUser(CyclicBarrier barrier) {
super();
this.barrier = barrier;
}
public EndUser(CyclicBarrier barrier, Consumer consumer) {
super();
this.barrier = barrier;
this.consumer = consumer;
}
public Consumer getConsumer() {
return consumer;
}
public void setConsumer(Consumer consumer) {
this.consumer = consumer;
}
public CyclicBarrier getBarrier() {
return barrier;
}
public void setBarrier(CyclicBarrier barrier) {
this.barrier = barrier;
}
@Override
public void run() {
try
{
while(true)
{
System.out.println(consumer.getValue());
barrier.await();
}
}
catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
输出 [除非所有最终用户都获取了他们的数据,否则消费者不会从生产者那里读取数据]
Producer added value 24
Producer added value 10
24
24
24
10
10
Producer added value 0
10
0
0
0
推荐阅读
- spring-boot - Kubernetes - SpringBoot - Neo4j - 启动时出错
- ssis - 在 SSIS 中保护脚本任务
- python - 按时间值升序对数据帧索引进行排序
- java - 当使用 Intent 时从 Java Activity 启动 Kotlin Activity 时,应用程序崩溃
- c++ - 将 std::decay 与 std::forward 一起使用
- python - Xpath selenium Python的双重条件
- c# - 我可以在 c# 中为 Realm 自定义 getter 和 setter 吗?
- c - C11 标准中的哪些规则决定了 `int tx = INT_MAX +1` 的评估?
- laravel - 如何通过一个表单提交插入多个数据 [ LARAVEL - Eloquent ]
- reactjs - 为什么在事件回调中偶尔会丢失按钮的值字段