spring-boot - 如何根据 HttpOutBoundGateway 的失败 HttpStatus (401,400) 重试消息
问题描述
基本上我的用例是在 HttpOutboundGateway 请求中发生 401 时重试 http 请求。该请求来自一个 jms 代理进入集成流。
@Bean
IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory,
MessageHandler creditWalletHttpGateway) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(cp.getTransactionIn()))
.<String, CreditRequest>transform(
request -> new Gson().fromJson(request, CreditRequest.class))
.enrichHeaders((headerEnricherSpec -> {
// Todo get token from cache
headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
}))
.handle(creditWalletHttpGateway, (e) -> e.advice(retryAdvice()))
.get();
}
@Bean
MessageHandler creditWalletHttpGateway( @Value("${api.base.uri:https:/localhost/v3/sync}") URI uri) {
HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler(uri);
httpHandler.setExpectedResponseType(CreditResponse.class);
httpHandler.setHttpMethod(HttpMethod.POST);
return httpHandler;
}
@Bean
RequestHandlerRetryAdvice retryAdvice() {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(errorMessageSendingRecoverer());
return requestHandlerRetryAdvice;
}
@Bean
ErrorMessageSendingRecoverer errorMessageSendingRecoverer() {
return new ErrorMessageSendingRecoverer(recoveryChannel());
}
@Bean
MessageChannel recoveryChannel() {
return new DirectChannel();
}
@Bean
MessageChannel retryChannel() {
return new DirectChannel();
}
@Bean
IntegrationFlow handleRecovery() {
return IntegrationFlows.from("recoveryChannel")
.log(Level.ERROR, "error", m -> m.getPayload())
.<RuntimeException>handle((message) -> {
MessagingException exception = (MessagingException) message.getPayload();
Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception.getFailedMessage();
// String token = gateway.getToken(configProperties);
String token = UUID.randomUUID().toString();
Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
.setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
.copyHeaders(message.getHeaders())
.build();
retryChannel().send(c);
})
.get();
}
@Bean
IntegrationFlow creditRequestFlow() {
return IntegrationFlows.from(retryChannel())
.log(Level.INFO, "info", m -> m.getPayload())
.handle(Http.outboundGateway("https://localhost/v3/sync")
.httpMethod(HttpMethod.POST)
.expectedResponseType(CreditResponse.class))
.get();
}
标头用适当的 http 标头丰富,然后我有一个建议,用默认的简单策略重试请求,RequestHandlerAdvice 方法的问题是它将handleRecovery Flow 中的异常消息默认为无 HttpException 类(MessageException),因此我无法检查 HttpStatus 代码以重新路由消息。所以我的问题基本上是我如何设计一个基于 HttpStatus 401 重试 HttpOutBoundRequest 的流程。
解决方案
我通过引入网关来进行出站 http 调用并使用递归方式对其进行管理来解决了这个问题
@MessagingGateway
public interface B2WGateway {
/**
*
* @param message
* @return
*/
@Gateway(requestChannel = "credit.input")
CreditResponse bankToWallet(Message<CreditRequest> message);
}
然后隔离http outbound 集成流程
@Bean
IntegrationFlow credit() {
return f -> f.log(Level.INFO, "info", m -> m.getHeaders())
.handle(Http.outboundGateway(configProperties.getBankToWalletUrl())
.httpMethod(HttpMethod.POST)
.expectedResponseType(CreditResponse.class)
.errorHandler(new ResponseErrorHandler() {
@Override
public boolean hasError(ClientHttpResponse clientHttpResponse) throws IOException {
return clientHttpResponse.getStatusCode().equals(HttpStatus.UNAUTHORIZED);
}
@Override
public void handleError(ClientHttpResponse clientHttpResponse) throws IOException {
throw new AuthenticationRequiredException("Authentication Required");
}
}));
}
然后解析消息handleRecovery获取token刷新后发送消息
@Bean
IntegrationFlow handleRecovery() {
return IntegrationFlows.from("recoveryChannel")
.log(Level.ERROR, "error", m -> m.getPayload())
.<RuntimeException>handle((p, h) -> {
MessageHandlingExpressionEvaluatingAdviceException exception = (MessageHandlingExpressionEvaluatingAdviceException) p;
Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception
.getFailedMessage();
// String token = gateway.getToken(configProperties);
String token = UUID.randomUUID().toString();
Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
.setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
.copyHeaders(h)
.build();
return c;
})
.channel("credit.input")
.get();
}
然后修改流程的开始以使用网关服务和表达建议。
@Bean
IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory) {
return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination(cp.getTransactionIn()))
.<String, CreditRequest>transform(
request -> new Gson().fromJson(request, CreditRequest.class))
.enrichHeaders((headerEnricherSpec -> {
// Todo get token from cache
headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
}))
.handle((GenericHandler<CreditRequest>) (creditRequest, headers) -> gateway
.bankToWallet(MessageBuilder.withPayload(creditRequest)
.copyHeaders(headers)
.build()), (e) -> e.advice(retryAdvice()))
.get();
}
Spring Integration的启示——管理 http 出站适配器调用中的 401 错误
推荐阅读
- symfony4 - 如何在 flashbag 中显示文件异常(try/catch)?
- reactjs - reactstrap 和 Chrome 自动填充将密码放入电子邮件输入中
- amazon-web-services - AWS MSK Kafka 不支持 Ubuntu 客户端 awscli?
- c - 了解stm8s反汇编中的功能?
- c++ - 将变量传递给 lambda
- php - 在异常情况下继续 Laravel 工作
- elasticsearch - 选择并更新所有匹配的文档
- javascript - 没有 setInterval() 的预定消息
- c - C 中 send() 的最大 size_t 值
- python - 冒泡排序未按正确顺序输出(浮点数)