java - 防止 AMQP 队列使用者中的无限循环
问题描述
我想在 AMQP 队列侦听器中实现 Web Flux 客户端。我试过这个:
@Component
public class TransactionGenesisAuthorizeListener {
public TransactionResponseFactory transactionGenesisAuthorizeProcess(AuthorizeRequestFactory tf) throws Exception {
AuthorizeResponse response = null;
try {
RestClient client = RestClientBuilder.builder()
.gatewayUrl(URL)
.build();
Mono<AuthorizeResponse> result = client.executeAndReceiveAuthorize(request);
response = result.block();
return parseRawSuccessResponse(response);
}
private TransactionResponseFactory parseRawSuccessResponse(AuthorizeResponse response) {
................
return obj;
}
}
//网络客户端
public Mono<AuthorizeResponse> executeAndReceiveAuthorize(AuthorizeRequest transaction) {
Mono<AuthorizeRequest> transactionMono = Mono.just(transaction);
return client.post().uri(checkTrailingSlash(gatewayUrl) + token)
.header(HttpHeaders.USER_AGENT, "Mozilla/5.0")
.accept(MediaType.APPLICATION_XML)
.contentType(MediaType.APPLICATION_XML)
.body(transactionMono, AuthorizeRequest.class)
.retrieve()
.bodyToMono(AuthorizeResponse.class);
}
错误日志
2019-08-27 19:42:09,280 INFO [stdout] (processingTransactionGenesisAuthorizeContainer-1) 19:42:09.280 [processingTransactionGenesisAuthorizeContainer-1] WARN o.s.a.r.l.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException
2019-08-27 19:42:09,282 INFO [stdout] (processingTransactionGenesisAuthorizeContainer-1) 19:42:09.282 [processingTransactionGenesisAuthorizeContainer-1] INFO o.s.a.r.l.SimpleMessageListenerContainer - Restarting Consumer@6e537459: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://guest@127.0.0.1:5672/,10), conn: Proxy@70156185 Shared Rabbit Connection: SimpleConnection@631ca718 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 49982], acknowledgeMode=AUTO local queue size=0
但不幸的是,当我使用错误的凭据连接 Web 客户端时,我收到无限错误消息,可能是因为 AMQP 客户端正在重新发送有效负载。有什么方法可以抛出异常并停止重新发送队列内容并一次又一次?
解决方案
考虑捕获异常并将其包装到AmqpRejectAndDontRequeueException
:
/**
* Exception for listener implementations used to indicate the
* basic.reject will be sent with requeue=false in order to enable
* features such as DLQ.
* @author Gary Russell
* @since 1.0.1
*
*/
@SuppressWarnings("serial")
public class AmqpRejectAndDontRequeueException extends AmqpException {
另请参阅文档中有关此异常的一些用例以及重试配置:https ://docs.spring.io/spring-amqp/docs/current/reference/html/#async-listeners
推荐阅读
- c++ - 如何防止手动实例化所有模板类型?
- css - 使用纯 css 为具有特定字体系列的所有元素设置字体大小,没有 Javascript
- c# - 有中断时for循环继续循环
- java - Hibernate/JPA 反射问题
- javascript - 如何在 Android Studio 中为 React Native 应用程序的开发和发布应用程序定义变体
- python - FastAPI:如何访问依赖项中的 APIRoute 对象
- sql - 如何在proc中删除sql中的列?
- c# - 如果数据库表中不存在列,则排除或忽略实体模型的属性
- python - 将 csv 文件作为数据框读取时模式错误
- android - Android相机意图打开前置摄像头而不是后置摄像头