java - RabbitMq:经久不衰,不像我想象的那样
问题描述
所以,一步一步,我想做什么:
如果发送到另一个系统时发生错误,我会收到数据,然后我想将数据发送到rabbitMQ:
@Override
public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
try {
anketaService.updateAnketa(profileId, anketa, profileVersion);
} catch (ClubProNotAvailableException e) {
rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
profileService.updateProfileAnketa(profileId, a, null);
}
}
}
接下来,我想接受这些数据和队列,并尝试在某个时间间隔再次发送它们。
为此我:
我接受消息
我正在尝试重新发送它:
a) 如果一切顺利,我将其从队列中删除
b) 如果发生错误,我调用容器的停止方法。一段时间后,我使用调度程序调用容器的启动方法
@Override
public void onMessage(Message message, Channel channel) throws IOException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
listenerContainer.stop();
throw new ClubProNotAvailableException();
}
}
public void startContainer() {
listenerContainer.start();
}
我遇到过这样的问题:
消息不是每次都传递到队列中。有时我必须多次调用 convert And Send 方法。
当我从队列中获取消息并发生错误时,我关闭了容器,然后当它打开时,队列是空的,当我关闭时,我看到以下消息:
2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0
我该如何解决这种情况?
继续的问题。
我更正了这样的代码:
@Override
public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
ClubProNotAvailableRabbit data = null;
try {
data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
methodCall(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (ClubProNotAvailableException e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
Thread.sleep(20000);
}
}
Thread.sleep 在这里进行实验。
我希望当我从 rabbitmq 管理控制台的队列中获取消息时,我会看到它进入 Unacked 状态,这就是它发生的方式。
然后,当发生错误时,我调用 basicReject 方法,并且我希望状态在 basicReject 调用行之后立即变为就绪状态,但在方法完成后立即变为就绪状态。
未确认状态:
为什么会这样?它应该如何工作以及什么机制?为什么调用 baseReject 方法后消息没有立即准备好(控制台兔子中的状态)?
解决方案
为无响应的消费者关闭通道:
这意味着侦听器“卡在”您的代码中-您不能stop()
从侦听器本身调用-container.stop()
等待侦听器退出。你应该stop(() -> log.info("stopped container"))
改用。
您需要basicReject
在 catch 案例中 - 容器不会使用 MANUAL acks 为您处理它。
如果您自己确认/确认消息,则必须使用手动确认。
通常最好让容器来处理您的消息。
推荐阅读
- c++11 - wxTimer 没有调用覆盖的 Notify()
- html - rails 循环遍历表单中的必需属性
- vba - 有没有办法遍历所有的形状属性?(VBA 字)
- angularjs - AngularJS:过滤时如何忽略空值?
- c# - 如何忽略 Json.NET 反序列化中的特定字典键?
- mysql - 将 MySQL 表文本值拆分为多行
- javascript - Chrome 扩展:第一个创建的窗口选项卡不调用 chrome.tabs.onUpdated
- sql - CHARINDEX 怪异
- javascript - 将字母转换为带空格的数字
- php - Laravel 在构建查询时如何访问关系?