spring-boot - 我用spring cloud设置了三个rabbitmq队列,其中一个队列随机失败
问题描述
我想做的是,当我在消费者处收到一条消息时,我会根据那里的内容过滤该消息,过滤器将在三个不同的生产者的帮助下将消息发送到 3 个不同的队列。
这是三个队列的配置:
cloud:
stream:
bindings:
QuickScore:
concurrency: 5
destination: quickScore
content-type: application/json
group: quickScoreGroup
maxConcurrency: 10
recoveryInterval: 10000
SuitabilityScore:
concurrency: 5
destination: suitabilityScore
content-type: application/json
group: suitabilityScore
maxConcurrency: 10
recoveryInterval: 10000
CompletenessScore:
concurrency: 5
destination: completenessScore
content-type: application/json
group: completenessScore
maxConcurrency: 10
recoveryInterval: 10000
rabbitmq:
host: ${rabbitmq.host:localhost}
username: guest
password: guest
port: 5672
队列的自定义通道
public interface CustomChannels {
@Output("QuickScore")
MessageChannel publishMessageToQuickScore();
@Input("QuickScore")
SubscribableChannel receivedAtQuickScore();
@Output("CompletenessScore")
MessageChannel publishMessageToCompletenessScore();
@Input("CompletenessScore")
SubscribableChannel receivedAtCompletenessScore();
@Output("SuitabilityScore")
MessageChannel publishMessageToSuitabilityScore();
@Input("SuitabilityScore")
SubscribableChannel receivedAtSuitabilityScore();
}
队列的生产者:
@Autowired
private CustomChannels customChannels;
public void sendToQuickScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToQuickScore().send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to Quick score" + updatedFieldsEntity.toString());
}
public void sendToCompletenessScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToCompletenessScore()
.send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to completeness score" + updatedFieldsEntity.toString());
}
public void sendToSuitabilityScore(UpdatedFieldsEntity updatedFieldsEntity) {
customChannels
.publishMessageToSuitabilityScore()
.send(MessageBuilder.withPayload(updatedFieldsEntity).build());
log.info("sending to suitability score" + updatedFieldsEntity.toString());
}
}
这就是我在不同队列中过滤和发布的方式:
@Autowired
private EventProducer producer;
public UpdatedFieldsEntity CheckUpdatedKey(UpdatedFieldsEntity updatedFieldsEntity)
throws RezoomexException {
logger.info("\n\n Checking UpdateKeys " + updatedFieldsEntity.toString());
if (updatedFieldsEntity == null) {
RezoomexException exception = new RezoomexException("update message is null");
throw exception;
}
for (UpdatedFields updatedFields : updatedFieldsEntity.getUpdatedFields()) {
UpdateKey element = updatedFields.getUpdateKey();
if (element.toString().equals(TECHNOLOGIES_UNDER_SKILLSET) || element.toString()
.equals(TOTAL_EXPERIENCE_VALUE)
|| element.toString().equals(TECHNOLOGIES) || element.toString()
.equals(TOTAL_EXPERIENCE_SUFFIX)) {
IsThreeScores = true;
}
}
if (IsThreeScores == true) {
logger.info("\n\n\n publishing message to all Q");
producer.sendToQuickScore(updatedFieldsEntity);
producer.sendToSuitabilityScore(updatedFieldsEntity);
producer.sendToCompletenessScore(updatedFieldsEntity);
IsThreeScores = false;
} else {
logger.info("\n\n\n publishing message to 2 Q");
producer.sendToSuitabilityScore(updatedFieldsEntity);
producer.sendToCompletenessScore(updatedFieldsEntity);
}
return updatedFieldsEntity;
}
}
第一次所有队列都使用消息,但第二次三个队列中的任何一个都会抛出异常:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.14.RELEASE.jar!/:4.3.14.RELEASE]
... 43 common frames omitted
解决方案
问题是因为您使用相同的输入和输出通道(即从同一通道消费消息并在队列中发布消息)。有不同的消费渠道,例如:-
@Output("QuickScore")
MessageChannel publishMessageToQuickScore();
@Input("Score")
SubscribableChannel receivedAtQuickScore();
在您的代码中更改@input 或@output 的频道名称。
推荐阅读
- ruby - 带有 net-http-persistent 的法拉第使连接保持打开状态,导致“Errno::EMFILE: Too many open files”
- amazon-web-services - 在 Terraform 数据查找中选择标记以 a 或 b 结尾的位置
- salesforce - google云数据融合Salesforce插件错误
- javascript - 我怎样才能使这个项目的视频部分工作?
- python - 使用另一个函数返回值
- java - 布尔字段的反序列化和验证
- python - 当循环应该不同时重复相同的输出
- r - getCensus 夏威夷城市人口
- c# - c# TCP socket如何验证传入的字符串并处理它
- unix - What does this 'ls' command do?