首页 > 解决方案 > 如何根据 HttpOutBoundGateway 的失败 HttpStatus (401,400) 重试消息

问题描述

基本上我的用例是在 HttpOutboundGateway 请求中发生 401 时重试 http 请求。该请求来自一个 jms 代理进入集成流。

 @Bean
  IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory,
      MessageHandler creditWalletHttpGateway) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
        .destination(cp.getTransactionIn()))
        .<String, CreditRequest>transform(
            request -> new Gson().fromJson(request, CreditRequest.class))

        .enrichHeaders((headerEnricherSpec -> {
          // Todo get token from cache
          headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
          headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
          headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
        }))
        .handle(creditWalletHttpGateway, (e) -> e.advice(retryAdvice()))
        .get();
  }


@Bean
  MessageHandler creditWalletHttpGateway( @Value("${api.base.uri:https:/localhost/v3/sync}") URI uri) {
    HttpRequestExecutingMessageHandler httpHandler = new HttpRequestExecutingMessageHandler(uri);
    httpHandler.setExpectedResponseType(CreditResponse.class);
    httpHandler.setHttpMethod(HttpMethod.POST);
    return httpHandler;
  }

 @Bean
  RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(errorMessageSendingRecoverer());
    return requestHandlerRetryAdvice;
  }

  @Bean
  ErrorMessageSendingRecoverer errorMessageSendingRecoverer() {
    return new ErrorMessageSendingRecoverer(recoveryChannel());
  }

  @Bean
  MessageChannel recoveryChannel() {
    return new DirectChannel();
  }

  @Bean
  MessageChannel retryChannel() {
    return new DirectChannel();
  }

  @Bean
  IntegrationFlow handleRecovery() {
    return IntegrationFlows.from("recoveryChannel")
        .log(Level.ERROR, "error", m -> m.getPayload())
        .<RuntimeException>handle((message) -> {

          
          MessagingException exception = (MessagingException) message.getPayload();
          Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception.getFailedMessage();
          // String token = gateway.getToken(configProperties);
          String token = UUID.randomUUID().toString();
          Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
              .setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
              .copyHeaders(message.getHeaders())
              .build();

          retryChannel().send(c);

        })
        .get();
  }

  @Bean
  IntegrationFlow creditRequestFlow() {

    return IntegrationFlows.from(retryChannel())
        .log(Level.INFO, "info", m ->  m.getPayload())
        .handle(Http.outboundGateway("https://localhost/v3/sync")
        .httpMethod(HttpMethod.POST)
        .expectedResponseType(CreditResponse.class))
        .get();

  }

标头用适当的 http 标头丰富,然后我有一个建议,用默认的简单策略重试请求,RequestHandlerAdvice 方法的问题是它将handleRecovery Flow 中的异常消息默认为无 HttpException 类(MessageException),因此我无法检查 HttpStatus 代码以重新路由消息。所以我的问题基本上是我如何设计一个基于 HttpStatus 401 重试 HttpOutBoundRequest 的流程。

标签: spring-bootspring-integrationspring-integration-dsl

解决方案


我通过引入网关来进行出站 http 调用并使用递归方式对其进行管理来解决了这个问题

@MessagingGateway
public interface B2WGateway {

    /**
     *
     * @param message
     * @return
     */
    @Gateway(requestChannel = "credit.input")
    CreditResponse bankToWallet(Message<CreditRequest> message);


}

然后隔离http outbound 集成流程

 @Bean
  IntegrationFlow credit() {

    return f -> f.log(Level.INFO, "info", m -> m.getHeaders())
        .handle(Http.outboundGateway(configProperties.getBankToWalletUrl())
            .httpMethod(HttpMethod.POST)
            .expectedResponseType(CreditResponse.class)
        .errorHandler(new ResponseErrorHandler() {
          @Override
          public boolean hasError(ClientHttpResponse clientHttpResponse) throws IOException {
            return clientHttpResponse.getStatusCode().equals(HttpStatus.UNAUTHORIZED);
          }

          @Override
          public void handleError(ClientHttpResponse clientHttpResponse) throws IOException {
              throw new AuthenticationRequiredException("Authentication Required");
          }
        }));
  }

然后解析消息handleRecovery获取token刷新后发送消息

@Bean
  IntegrationFlow handleRecovery() {
    return IntegrationFlows.from("recoveryChannel")
        .log(Level.ERROR, "error", m -> m.getPayload())
        .<RuntimeException>handle((p, h) -> {
          MessageHandlingExpressionEvaluatingAdviceException exception = (MessageHandlingExpressionEvaluatingAdviceException) p;
          Message<CreditRequest> originalCreditRequest = (Message<CreditRequest>) exception
              .getFailedMessage();
          // String token = gateway.getToken(configProperties);
          String token = UUID.randomUUID().toString();
          Message<CreditRequest> c = MessageBuilder.fromMessage(originalCreditRequest)
              .setHeader(ApiConstants.AUTHORIZATION, String.join(" ", "Bearer", token))
              .copyHeaders(h)
              .build();
          return c;
        })
        .channel("credit.input")
        .get();
  }

然后修改流程的开始以使用网关服务和表达建议。

 @Bean
  IntegrationFlow bank2wallet(ConnectionFactory jmsConnectionFactory) {
    return IntegrationFlows.from(Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
        .destination(cp.getTransactionIn()))
        .<String, CreditRequest>transform(
            request -> new Gson().fromJson(request, CreditRequest.class))

        .enrichHeaders((headerEnricherSpec -> {
          // Todo get token from cache
          headerEnricherSpec.header(HttpHeaders.AUTHORIZATION, String.join(" ", "Bearer", ""));
          headerEnricherSpec.header(HttpHeaders.ACCEPT, "application/json");
          headerEnricherSpec.header(HttpHeaders.CONTENT_TYPE, "application/json");
        }))
        .handle((GenericHandler<CreditRequest>) (creditRequest, headers) -> gateway
            .bankToWallet(MessageBuilder.withPayload(creditRequest)
                .copyHeaders(headers)
                .build()), (e) -> e.advice(retryAdvice()))
        .get();
  }

Spring Integration的启示——管理 http 出站适配器调用中的 401 错误


推荐阅读