spring-jms - QPID:添加临时消息消费者以排出队列中未处理的消息
问题描述
我在一个持久的 QPID 队列上有 2 个消息消费者。这些消息消费者是使用以下代码创建的(我在下面的示例中排除了异常处理以保持主题):
class MessageConsumer {
private Session session;
private Queue queue;
private final MessageListener listener;
private final ConditionProvider condition; //can be used to modify the selection string if needed
public MessageConsumer(ConnectionFactory connectionFactory, String queueName, MessageListener listener, ConditionProvider condition) {
session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
queue = session.createQueue(queueName);
}
//Wakes up every minute and processes messages for the given condition
@Scheduled(fixedRateString = "60000", initialDelay = 1000)
public void listen() {
if (messageConsumer != null) {
messageConsumer.close();
}
messageConsumer = session.createConsumer(queue, condition.getCondition());
messageConsumer.setMessageListener(listener);
}
- 每当我想更改用于查询队列的条件时,都会调用 listen()。目前,我在启动时实例化了 MessageConsumer 类的 2 个实例。提供的条件选择队列上低于和高于特定阈值的消息。阈值是在消息头上设置的属性。第一个侦听器用于处理未达到特定阈值(错误率)的消息,而第二个侦听器将已达到阈值的消息移动到另一个队列。目前,这两个都按预期工作。
例子:
- 消费者 1 - 给我阈值 >= 0 和阈值 < 10 的消息并处理它们。侦听器代码然后执行以下操作:如果无法处理它们,它会增加阈值并将其放回队列中。
- 消费者 2 - 给我阈值 > 10 的消息并将它们放在不同的队列中。
我现在想添加一个附加功能,即如果我知道消息未处理的原因已得到解决,则强制排空队列。
什么是有效的选择?
- 停止现有消费者有意义吗?如果是这样,我如何防止@Scheduled 运行并在排空队列后重新启动@Scheduled
- 将新消费者添加到同一个队列并获取所有消息?如何防止现有消费者获取这些消息?
- 更改现有消费者的条件以在下次计划的 listen() 运行中选择所有消息并耗尽所有消息?不知何故,这对我来说更有吸引力,因为它处理了前两个选项。(将其中一个消费者的查询字符串设置为永远不会返回任何消息的内容,并将另一个消费者的查询字符串设置为返回队列中的所有消息)。
任何帮助表示赞赏。
解决方案
推荐阅读
- python - 构建嗖嗖索引时超出最大递归深度
- python - 用全局变量序列化函数
- ios - 具有按比例填充的子视图和内在宽度的 iOS StackViews 未按预期运行
- android - 无法在扩展 View 的内部类中覆盖 onCreateOptionMenu
- python - 如何在幻灯片的特定位置通过 python 将 Excel 文件和链接数据嵌入 PowerPoint
- c# - 无法获取 dll 文件的第 3 方数字签名
- php - Laravel 自定义时间表
- php - 对整数使用 LIKE
- r - RStudio 字符编码
- ios - Firestore 到 TableView [Swift]