hazelcast - 从 Hazelcast 的队列中删除消耗的项目
问题描述
我正在尝试使用 Hazelcast 实现生产者-消费者模型。
生产者将一个项目放入队列,消费者使用 take() 方法消费它。
我关闭了消费者应用程序并重新开始。消费者从队列中检索先前消费的项目。
我尝试了 Hazelcast Ringbuffer,我看到了同样的行为。
有没有办法强制从 Hazelcast 的队列中删除消耗的项目?
提前致谢
生产者.java:
public class Producer implements MembershipListener {
private HazelcastInstance hzInstance;
private Cluster cluster;
private IAtomicLong counter;
private IQueue<Data> dataQueue;
private IMap<String, List<Data>> dataByConsumerId;
public static void main(String[] args) {
Producer producer = new Producer();
Scanner scanIn = new Scanner(System.in);
while (true) {
String cmd = scanIn.nextLine();
if (cmd.equals("QUIT")) {
break;
} else if (cmd.equals("ADD")) {
long x = producer.counter.addAndGet(1);
producer.dataQueue.add(new Data(x, x + 1));
}
}
scanIn.close();
}
public Producer() {
hzInstance = Hazelcast.newHazelcastInstance(configuration());
counter = hzInstance.getCPSubsystem().getAtomicLong("COUNTER");
dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
dataQueue = hzInstance.getQueue("DATA_QUEUE");
cluster = hzInstance.getCluster();
cluster.addMembershipListener(this);
}
public Config configuration() {
Config config = new Config();
config.setInstanceName("hazelcast-instance");
MapConfig mapConfig = new MapConfig();
mapConfig.setName("configuration");
mapConfig.setTimeToLiveSeconds(-1);
config.addMapConfig(mapConfig);
return config;
}
@Override
public void memberAdded(MembershipEvent membershipEvent) {
}
@Override
public void memberRemoved(MembershipEvent membershipEvent) {
String removedConsumerId = membershipEvent.getMember().getUuid().toString();
List<Data> items = dataByConsumerId.remove(removedConsumerId);
if (items == null)
return;
items.forEach(item -> {
System.out.println("Push data to recover :" + item.toString());
dataQueue.add(item);
});
}
}
消费者.java:
public class Consumer {
private String id;
private HazelcastInstance hzInstance;
private IMap<String, List<Data>> dataByConsumerId;
private IQueue<Data> dataQueue;
public Consumer() {
hzInstance = Hazelcast.newHazelcastInstance(configuration());
id = hzInstance.getLocalEndpoint().getUuid().toString();
dataByConsumerId = hzInstance.getMap("CONSUMER_DATA");
dataByConsumerId.put(id, new ArrayList<Data>());
dataQueue = hzInstance.getQueue("DATA_QUEUE");
}
public Config configuration() {
Config config = new Config();
config.setInstanceName("hazelcast-instance");
MapConfig mapConfig = new MapConfig();
mapConfig.setName("configuration");
mapConfig.setTimeToLiveSeconds(-1);
config.addMapConfig(mapConfig);
return config;
}
public static void main(String[] args) {
Consumer consumer = new Consumer();
try {
consumer.run();
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
}
private void run() {
while (true) {
System.out.println("Take queue item...");
try {
var item = dataQueue.take();
System.out.println("New item taken:" + item.toString());
var dataInCluster = dataByConsumerId.get(id);
dataInCluster.add(item);
dataByConsumerId.put(id, dataInCluster);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
数据.java:
public class Data implements Serializable {
private static final long serialVersionUID = -975095628505008933L;
private long x, y;
public Data(long x, long y) {
super();
this.x = x;
this.y = y;
}
public long getX() {
return x;
}
public void setX(long x) {
this.x = x;
}
public long getY() {
return y;
}
public void setY(long y) {
this.y = y;
}
@Override
public String toString() {
return "Data [x=" + x + ", y=" + y + "]";
}
}
解决方案
推荐阅读
- php - PHP内存不足试图将sqlsrv查询结果获取到数组
- sql - 如何检查h2数据库的大小?
- python - Python Netcfd4 NameError:未定义名称“lat_array”
- python-3.x - 在 Matplolib 中使用百分比累积计数增强对比度
- sql-server - 为数据库中的所有表创建删除列的视图
- shell - Buildroot 通用包未安装在 init.d 目录中
- java - Ajax post 调用应该返回到 html 页面
- promql - PromqQL 中的 label:metric:function 是什么?
- javascript - 不使用 Regex 而是循环搜索和替换字符串,如何一次替换多个单词?
- gradle - 如何使用自定义任务作为工件