首页 > 解决方案 > 我用spring cloud设置了三个rabbitmq队列,其中一个队列随机失败

问题描述

我想做的是,当我在消费者处收到一条消息时,我会根据那里的内容过滤该消息,过滤器将在三个不同的生产者的帮助下将消息发送到 3 个不同的队列。

这是三个队列的配置:

  cloud:
    stream:
      bindings:
        QuickScore:
          concurrency: 5
          destination: quickScore
          content-type: application/json
          group: quickScoreGroup
          maxConcurrency: 10
          recoveryInterval: 10000
        SuitabilityScore:
          concurrency: 5
          destination: suitabilityScore
          content-type: application/json
          group: suitabilityScore
          maxConcurrency: 10
          recoveryInterval: 10000
        CompletenessScore:
          concurrency: 5
          destination: completenessScore
          content-type: application/json
          group: completenessScore
          maxConcurrency: 10
          recoveryInterval: 10000

  rabbitmq:
    host: ${rabbitmq.host:localhost}
    username: guest
    password: guest
    port: 5672

队列的自定义通道

public interface CustomChannels {

  @Output("QuickScore")
  MessageChannel publishMessageToQuickScore();

  @Input("QuickScore")
  SubscribableChannel receivedAtQuickScore();

  @Output("CompletenessScore")
  MessageChannel publishMessageToCompletenessScore();

  @Input("CompletenessScore")
  SubscribableChannel receivedAtCompletenessScore();

  @Output("SuitabilityScore")
  MessageChannel publishMessageToSuitabilityScore();

  @Input("SuitabilityScore")
  SubscribableChannel receivedAtSuitabilityScore();
}

队列的生产者:

@Autowired
  private CustomChannels customChannels;

  public void sendToQuickScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToQuickScore().send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to Quick score" + updatedFieldsEntity.toString());
  }

  public void sendToCompletenessScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToCompletenessScore()
        .send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to completeness score" + updatedFieldsEntity.toString());
  }

  public void sendToSuitabilityScore(UpdatedFieldsEntity updatedFieldsEntity) {
    customChannels
        .publishMessageToSuitabilityScore()
        .send(MessageBuilder.withPayload(updatedFieldsEntity).build());
    log.info("sending to suitability score" + updatedFieldsEntity.toString());
  }
}

这就是我在不同队列中过滤和发布的方式:

 @Autowired
  private EventProducer producer;

  public UpdatedFieldsEntity CheckUpdatedKey(UpdatedFieldsEntity updatedFieldsEntity)
      throws RezoomexException {
    logger.info("\n\n Checking UpdateKeys " + updatedFieldsEntity.toString());
    if (updatedFieldsEntity == null) {
      RezoomexException exception = new RezoomexException("update message is null");
      throw exception;
    }
    for (UpdatedFields updatedFields : updatedFieldsEntity.getUpdatedFields()) {
      UpdateKey element = updatedFields.getUpdateKey();
      if (element.toString().equals(TECHNOLOGIES_UNDER_SKILLSET) || element.toString()
          .equals(TOTAL_EXPERIENCE_VALUE)
          || element.toString().equals(TECHNOLOGIES) || element.toString()
          .equals(TOTAL_EXPERIENCE_SUFFIX)) {
        IsThreeScores = true;
      }

    }
    if (IsThreeScores == true) {
      logger.info("\n\n\n publishing message to all Q");
      producer.sendToQuickScore(updatedFieldsEntity);
      producer.sendToSuitabilityScore(updatedFieldsEntity);
      producer.sendToCompletenessScore(updatedFieldsEntity);
      IsThreeScores = false;
    } else {
      logger.info("\n\n\n publishing message to 2 Q");
      producer.sendToSuitabilityScore(updatedFieldsEntity);
      producer.sendToCompletenessScore(updatedFieldsEntity);
    }
    return updatedFieldsEntity;
  }
}

第一次所有队列都使用消息,但第二次三个队列中的任何一个都会抛出异常:

    org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception

Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
    ... 43 common frames omitted

标签: spring-bootrabbitmq

解决方案


问题是因为您使用相同的输入和输出通道(即从同一通道消费消息并在队列中发布消息)。有不同的消费渠道,例如:-

  @Output("QuickScore")
  MessageChannel publishMessageToQuickScore();

  @Input("Score")
  SubscribableChannel receivedAtQuickScore();

在您的代码中更改@input 或@output 的频道名称。


推荐阅读