首页 > 解决方案 > RabbitMq:经久不衰,不像我想象的那样

问题描述

所以,一步一步,我想做什么:

如果发送到另一个系统时发生错误,我会收到数据,然后我想将数据发送到rabbitMQ:

    @Override
    public void updateAnketaIfThrowThenSendMessageInRabbit(ProfileId profileId, ChangeClientAnketaRequest anketa, String profileVersion) {
        try {
            anketaService.updateAnketa(profileId, anketa, profileVersion);
        } catch (ClubProNotAvailableException e) {
            rabbitTemplate.convertAndSend(config.getExchange(), config.getRoutingKey(), clubProNotAvailableRabbit);
            Anketa a = conversionService.convert(conversionService.convert(anketa, UgAnketa.class), Anketa.class);
            profileService.updateProfileAnketa(profileId, a, null);
        }
    }
}

接下来,我想接受这些数据和队列,并尝试在某个时间间隔再次发送它们。

为此我:

  1. 我接受消息

  2. 我正在尝试重新发送它:

    a) 如果一切顺利,我将其从队列中删除

    b) 如果发生错误,我调用容器的停止方法。一段时间后,我使用调度程序调用容器的启动方法

   @Override
    public void onMessage(Message message, Channel channel) throws IOException {
        ClubProNotAvailableRabbit data = null;
        try {
            data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
            MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
            requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
            methodCall(data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (ClubProNotAvailableException e) {
            listenerContainer.stop();
            throw new ClubProNotAvailableException();
        }
    }

     public void startContainer() {
        listenerContainer.start();
    }

我遇到过这样的问题:

  1. 消息不是每次都传递到队列中。有时我必须多次调用 convert And Send 方法。

  2. 当我从队列中获取消息并发生错误时,我关闭了容器,然后当它打开时,队列是空的,当我关闭时,我看到以下消息:

2020-07-20 21:36:59.878 [INFO ] o.s.a.r.l.SimpleMessageListenerContainer - Workers not finished.
2020-07-20 21:36:59.878 [WARN ] o.s.a.r.l.SimpleMessageListenerContainer - Closing channel for unresponsive consumer: Consumer@77416991: tags=[[amq.ctag-E62UisbYdAAOQIM2bWr08w]], channel=Cached Rabbit Channel: AMQChannel(amqp://usergate_tst@10.64.177.12:5672/,35), conn: Proxy@6c60c170 Shared Rabbit Connection: SimpleConnection@5fb65b3a [delegate=amqp://usergate_tst@10.64.177.12:5672/, localPort= 59801], acknowledgeMode=MANUAL local queue size=0

我该如何解决这种情况?

继续的问题。

我更正了这样的代码:

   @Override
    public void onMessage(Message message, Channel channel) throws IOException, InterruptedException {
        ClubProNotAvailableRabbit data = null;
        try {
            data = OBJECT_MAPPER.readValue(message.getBody(), ClubProNotAvailableRabbit.class);
            MDC.put(data.getRequestContextRabbit().getRequestId(), UUID.randomUUID().toString());
            requestContextService.put(createRequestContext(data.getRequestContextRabbit(), data.getRequestContextRabbit().getFront()));
            methodCall(data);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (ClubProNotAvailableException e) {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
            Thread.sleep(20000);
        } 
    }

Thread.sleep 在这里进行实验。

我希望当我从 rabbitmq 管理控制台的队列中获取消息时,我会看到它进入 Unacked 状态,这就是它发生的方式。

然后,当发生错误时,我调用 basicReject 方法,并且我希望状态在 basicReject 调用行之后立即变为就绪状态,但在方法完成后立即变为就绪状态。

未确认状态:

rabbitMq 管理控制台

虽然 baseReject 方法已经奏效。 baseReject 工作

为什么会这样?它应该如何工作以及什么机制?为什么调用 baseReject 方法后消息没有立即准备好(控制台兔子中的状态)?

标签: javaspringrabbitmqspring-amqp

解决方案


为无响应的消费者关闭通道:

这意味着侦听器“卡在”您的代码中-您不能stop()从侦听器本身调用-container.stop()等待侦听器退出。你应该stop(() -> log.info("stopped container"))改用。

您需要basicReject在 catch 案例中 - 容器不会使用 MANUAL acks 为您处理它。

如果您自己确认/确认消息,则必须使用手动确认。

通常最好让容器来处理您的消息。


推荐阅读