首页 > 解决方案 > 从 Hazelcast 的队列中删除消耗的项目

问题描述

我正在尝试使用 Hazelcast 实现生产者-消费者模型。
生产者将一个项目放入队列,消费者使用 take() 方法消费它。
我关闭了消费者应用程序并重新开始。消费者从队列中检索先前消费的项目。
我尝试了 Hazelcast Ringbuffer,我看到了同样的行为。
有没有办法强制从 Hazelcast 的队列中删除消耗的项目?
提前致谢

生产者.java:

public class Producer implements MembershipListener {
  private HazelcastInstance hzInstance;

  private Cluster cluster;

  private IAtomicLong counter;

  private IQueue<Data> dataQueue;

  private IMap<String, List<Data>> dataByConsumerId;

  public static void main(String[] args) {
    Producer producer = new Producer();

    Scanner scanIn = new Scanner(System.in);
    while (true) {

      String cmd = scanIn.nextLine();
      if (cmd.equals("QUIT")) {
        break;
      } else if (cmd.equals("ADD")) {
        long x = producer.counter.addAndGet(1);
        producer.dataQueue.add(new Data(x, x + 1));

      }
    }
    scanIn.close();

  }

  public Producer() {
    hzInstance = Hazelcast.newHazelcastInstance(configuration());

    counter = hzInstance.getCPSubsystem().getAtomicLong("COUNTER");

    dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");

    dataQueue = hzInstance.getQueue("DATA_QUEUE");

    cluster = hzInstance.getCluster();
    cluster.addMembershipListener(this);
  }

  public Config configuration() {
    Config config = new Config();
    config.setInstanceName("hazelcast-instance");
    MapConfig mapConfig = new MapConfig();
    mapConfig.setName("configuration");
    mapConfig.setTimeToLiveSeconds(-1);
    config.addMapConfig(mapConfig);
    return config;
  }

  @Override
  public void memberAdded(MembershipEvent membershipEvent) {

  }

  @Override
  public void memberRemoved(MembershipEvent membershipEvent) {
    String removedConsumerId = membershipEvent.getMember().getUuid().toString();
    List<Data> items = dataByConsumerId.remove(removedConsumerId);
    if (items == null)
      return;
    items.forEach(item -> {
      System.out.println("Push data to recover :" + item.toString());
      dataQueue.add(item);
    });

  }

}

消费者.java:

public class Consumer {
  private String id;
  private HazelcastInstance hzInstance;
  private IMap<String, List<Data>> dataByConsumerId;
  private IQueue<Data> dataQueue;
  
  public Consumer() {
    hzInstance = Hazelcast.newHazelcastInstance(configuration());
    id = hzInstance.getLocalEndpoint().getUuid().toString();
    dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
    dataByConsumerId.put(id, new ArrayList<Data>());
    dataQueue = hzInstance.getQueue("DATA_QUEUE");
  }

  public Config configuration() {
    Config config = new Config();
    config.setInstanceName("hazelcast-instance");
    MapConfig mapConfig = new MapConfig();
    mapConfig.setName("configuration");
    mapConfig.setTimeToLiveSeconds(-1);
    config.addMapConfig(mapConfig);
    return config;
  }
  
  public static void main(String[] args) {
    Consumer consumer = new Consumer();
    try {
      consumer.run();
      System.in.read();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
  
  private void run() {
    while (true) {

      System.out.println("Take queue item...");
      try {
        var item = dataQueue.take();
        System.out.println("New item taken:" + item.toString());
        var dataInCluster = dataByConsumerId.get(id);
        dataInCluster.add(item);
        dataByConsumerId.put(id, dataInCluster);
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
    }
  }

}

数据.java:

public class Data implements Serializable {

  private static final long serialVersionUID = -975095628505008933L;

  private long x, y;

  public Data(long x, long y) {
    super();
    this.x = x;
    this.y = y;
  }

  public long getX() {
    return x;
  }

  public void setX(long x) {
    this.x = x;
  }

  public long getY() {
    return y;
  }

  public void setY(long y) {
    this.y = y;
  }

  @Override
  public String toString() {
    return "Data [x=" + x + ", y=" + y + "]";
  }

}

标签: hazelcast

解决方案


推荐阅读