java - RabbitMQ中队列的自动重新声明/恢复
问题描述
我在 java 中有 RabbitMQ 队列使用者应用程序。下面是声明队列的代码片段:
public static void declareQueue(final String rmqQueueName) {
try {
Map<String, Object> args = new HashMap<>();
args.put("x-max-priority", 10);
channel.queueDeclare(rmqQueueName, true, false, false, args);
} catch (final IOException e) {
log.warn("Could not declare the queue: " + rmqQueueName + " ", e);
}
}
消费者代码:
public void consumeBrokerMessage(final String rmqQueueName) throws IOException {
declareQueue(rmqQueueName);
log.info(" [*] Waiting for messages. To exit press CTRL+C");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(
final String consumerTag,
final Envelope envelope,
final AMQP.BasicProperties properties,
final byte[] body) throws IOException {
boolean flag = false;
final String brokerMessage = new String(body, StandardCharsets.UTF_8);
final long start = System.currentTimeMillis();
final OffsetDateTime utcStartDateTime = OffsetDateTime.now(ZoneOffset.UTC);
final String startDateTime =
utcStartDateTime.format(
DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"));
flag = BrokerMessage.handleBrokerMessage(utility);
if (flag) {
log.info("Broker Message has processed");
} else {
log.warn("Error while processing broker message");
}
final long deliveryTag = envelope.getDeliveryTag();
// positively acknowledge a single delivery, the message will
// be discarded
channel.basicAck(deliveryTag, false);
}
};
channel.basicQos(1);
channel.basicConsume(rmqQueueName, false, consumer);
}
================
public static Channel createChannel(
final String rmqHost,
final String rmqPort,
final String rmqUser,
final String rmqPassword,
final String rmqVHost) {
final ConnectionFactory factory = new ConnectionFactory();
factory.setHost(rmqHost);
factory.setPort(Integer.parseInt(rmqPort));
factory.setUsername(rmqUser);
factory.setPassword(rmqPassword);
factory.setVirtualHost(rmqVHost);
**factory.setAutomaticRecoveryEnabled(true);**
final Connection connection;
Channel rmqChannel = null;
try {
connection = factory.newConnection();
rmqChannel = connection.createChannel();
} catch (final IOException | TimeoutException e) {
log.warn("Could not establish rabbitmq connection. ", e);
}
return rmqChannel;
}
当我启动我的应用程序时,以最大优先级 10 创建队列。但是当我从 RabbitMQ 管理中删除队列时,队列不会自行恢复并且不会自动重新声明。有谁知道如何在不重新启动应用程序的情况下自动恢复和重新声明队列?
解决方案
推荐阅读
- java - Spring Boot OAuth 始终重定向到 HTTP (IBM Cloud CF + Spring Boot 2)
- c# - MVVM 将组合框内容绑定到字典中的列表
- php - 我如何从数据库中删除数据,除了最后 24 小时 UNIX 时间格式
- javascript - js window.matchMedia 返回错误的结果
- python - Django - 通过 Ajax 发布 jQuery 字典在 views.py 中打印 None
- java - Apache-POI 在 Excel 中设置值,但另一个单元格的公式无法使用该值,直到我在处理条中手动按 enter
- angular - Angular:如何在动态组件中使用共享模块?
- php - 按自定义字段对管理列进行排序未按预期工作
- soap - 使用 Sabre soap api 访问 pnrs 列表并从队列中删除
- c# - 在 post 方法中初始化新列表时,下拉列表为空