首页 > 解决方案 > 在java中的不同线程中同时读取和写入集合

问题描述

这是场景:
我有 3 条车道,我想从中移动我的箱子。每个盒子都有一个 groupId。所以车道将被分配给一个组。
因此,当我收到具有唯一 groupId 的首次移动请求时,我将创建一个列表并将此请求添加到该列表中,然后开始处理该列表。如果我收到另一个具有相同 groupId 的请求,我必须添加到同一个列表
(此列表用于处理不同线程中的请求),
否则创建一个新列表并分配一个新通道并开始处理。
请建议一个 java 集合,它可以帮助有效地实现这一点。
任何帮助将不胜感激!!

标签: javacollections

解决方案


我怀疑List对于您的用例来说是一个很好的数据结构。由于您在线程之间传递数据,因此BlockingQueue看起来更自然。

例如,这里是具有以下假设的实现:

  • 有 1 个box生产者,即:

    1)在 1 到 3 的范围内box随机生成groupId

    box2 )放入单曲 lane

  • 有 3 个box消费者,每个消费者:

    1)boxlane

    2)如果消费者groupId与盒子匹配,则消耗盒子groupId

  • 没有使用额外的框架(仅核心 Java)

import java.util.*;
import java.util.concurrent.*;

class Answer {
  public static void main(String[] args) {
     LinkedBlockingQueue<Box> lane = new LinkedBlockingQueue<>();

     Producer p = new Producer(lane);
     Consumer c1 = new Consumer(1, lane);
     Consumer c2 = new Consumer(2, lane);
     Consumer c3 = new Consumer(3, lane);

     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
     new Thread(c3).start();
  }
}

class Producer implements Runnable {
   private final LinkedBlockingQueue<Box> lane;
   Producer(LinkedBlockingQueue<Box> lane) { 
     this.lane = lane;
   }
   public void run() {
     try {
       while (true) { 
         lane.put(new Box(produceGroupId())); 
       }
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();  // set interrupt flag
     }
   }
   int produceGroupId() {
     // generate random int between 1 and 3
     return ThreadLocalRandom.current().nextInt(1, 4);
   }
}

class Consumer implements Runnable {
   private final int groupId;
   private final BlockingQueue<Box> lane;
   Consumer(int groupId, BlockingQueue<Box> lane) { 
     this.groupId = groupId;
     this.lane = lane;
   }
   public void run() {
     while (true) {
       Box box = lane.peek();
       if (box != null && box.getGroupId() == this.groupId) {
         consume(lane.poll()); 
       }
     }
   }
   void consume(Box box) { 
     System.out.println("Consumer " + groupId + " received " + box + " for proxessing.");
   }
}

class Box {
  private final int groupId;
  public Box(int groupId) {
    this.groupId = groupId;
  }

  public int getGroupId() {
    return this.groupId;
  }

  @Override
  public String toString() {
    return "<Box " + groupId + ">";
  }
}

如果目标是拥有3 个独立的通道,那么实现会有点不同:

import java.util.*;
import java.util.concurrent.*;

class Answer {
  public static void main(String[] args) {
     BlockingQueue<Box> lane1 = new LinkedBlockingQueue<Box>();
     BlockingQueue<Box> lane2 = new LinkedBlockingQueue<Box>();
     BlockingQueue<Box> lane3 = new LinkedBlockingQueue<Box>();
     Map<Integer, BlockingQueue<Box>> lanes = new ConcurrentHashMap<Integer, BlockingQueue<Box>>();
     lanes.put(1, lane1);
     lanes.put(2, lane2);
     lanes.put(3, lane3);

     Producer p = new Producer(lanes);
     Consumer c1 = new Consumer(1, lane1);
     Consumer c2 = new Consumer(2, lane2);
     Consumer c3 = new Consumer(3, lane3);

     new Thread(p).start();
     new Thread(c1).start();
     new Thread(c2).start();
     new Thread(c3).start();
  }
}

class Producer implements Runnable {
   private final Map<Integer, BlockingQueue<Box>> lanes;
   Producer(Map<Integer, BlockingQueue<Box>> lanes) { 
     this.lanes = lanes;
   }
   public void run() {
     try {
       while (true) { 
         int groupId = produceGroupId();
         BlockingQueue<Box> lane = lanes.get(groupId);
         lane.put(new Box(groupId)); 
       }
     } catch (InterruptedException ex) {
       Thread.currentThread().interrupt();
     }
   }
   int produceGroupId() {
     // generate random int between 1 and 3
     return ThreadLocalRandom.current().nextInt(1, 4);
   }
}

class Consumer implements Runnable {
   private final int consumerId;
   private final BlockingQueue<Box> lane;
   Consumer(int consumerId, BlockingQueue<Box> lane) { 
     this.consumerId = consumerId;
     this.lane = lane;
   }
   public void run() {
     try {
       while (true) {
         consume(lane.take()); 
       }
     } catch (InterruptedException ex) {}
   }
   void consume(Box box) { 
     System.out.println("Consumer " + consumerId + " received " + box + " for proxessing.");
   }
}

class Box {
  private final int groupId;
  public Box(int groupId) {
    this.groupId = groupId;
  }

  public int getGroupId() {
    return this.groupId;
  }

  @Override
  public String toString() {
    return "<Box " + groupId + ">";
  }
}

推荐阅读