首页 > 解决方案 > 在 Spring 集成中使用 WebFlux 出站网关进行错误处理

问题描述

我试图了解如何在 spring webflux 集成中的出站网关中处理错误。

在没有webflux int-http:outbound-gateway的 spring 集成中具有如下错误处理程序

<int-http:outbound-gateway
        http-method="GET"
        url-expression="url"
        expected-response-type="java.lang.String"
        error-handler="accessErrorHandler"
        header-mapper="headerMapper"
        />

但是在webflux int-webflux:outbound-gateway 的 spring 集成中没有错误处理程序

<int-webflux:outbound-gateway
                http-method="GET"
                url-expression="url"
                expected-response-type="java.lang.String"
                header-mapper="headerMapper"
        />

这是我对 pom.xml 的依赖:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-webflux</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

标签: springspring-integrationspring-webflux

解决方案


Spring Integration HTTP 模块完全基于RestTemplateSpring Web。那个有提到ErrorHandler它的同步请求。

Spring Integration WebFlux 模块完全基于 Spring WebFlux 基础的非阻塞WebClient。内部逻辑基于 Project Reactor 类型,例如FluxMono。为了尊重反应流规范,WebFluxRequestExecutingMessageHandler只返回一个Monofor 响应。如果在与服务器交互期间出现一些错误,我们会在那里:

requestSpec.exchange()
                    .flatMap(response -> {
                        HttpStatus httpStatus = response.statusCode();
                        if (httpStatus.isError()) {
                            return response.body(BodyExtractors.toDataBuffers())
                                    .reduce(DataBuffer::write)
                                    .map(dataBuffer -> {
                                        byte[] bytes = new byte[dataBuffer.readableByteCount()];
                                        dataBuffer.read(bytes);
                                        DataBufferUtils.release(dataBuffer);
                                        return bytes;
                                    })
                                    .defaultIfEmpty(new byte[0])
                                    .map(bodyBytes -> {
                                                throw new WebClientResponseException(
                                                        "ClientResponse has erroneous status code: "
                                                                + httpStatus.value() + " "
                                                                + httpStatus.getReasonPhrase(),
                                                        httpStatus.value(),
                                                        httpStatus.getReasonPhrase(),
                                                        response.headers().asHttpHeaders(),
                                                        bodyBytes,
                                                        response.headers().contentType()
                                                                .map(MimeType::getCharset)
                                                                .orElse(StandardCharsets.ISO_8859_1));
                                            }
                                    );
                        }
                        else {
                            return Mono.just(response);
                        }
                    });

所以,有些WebClientResponseException人会被扔进回复Mono中。在任何反应式或非反应式下游,这样的异常将被处理如下:

protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
    Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
    Throwable result = ex;
    if (!(ex instanceof MessagingException)) {
        result = new MessageHandlingException(requestMessage, ex);
    }
    if (errorChannel == null) {
        logger.error("Async exception received and no 'errorChannel' header exists and no default "
                + "'errorChannel' found", result);
    }
    else {
        try {
            sendOutput(new ErrorMessage(result), errorChannel, true);
        }
        catch (Exception e) {
            Exception exceptionToLog =
                    IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
                            () -> "failed to send error message in the [" + this + ']', e);
            logger.error("Failed to send async reply", exceptionToLog);
        }
    }
}

从请求消息的标头中errorChannel提取并回退到全局IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME.

有了所有这些,您应该订阅这样的错误通道来 WebClientResponseException分别处理这些实例。RestTemplate在 Spring Framework 文档中查看更多信息: https ://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access


推荐阅读