spring - 在 Spring 集成中使用 WebFlux 出站网关进行错误处理
问题描述
我试图了解如何在 spring webflux 集成中的出站网关中处理错误。
在没有webflux int-http:outbound-gateway的 spring 集成中具有如下错误处理程序:
<int-http:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
error-handler="accessErrorHandler"
header-mapper="headerMapper"
/>
但是在与webflux int-webflux:outbound-gateway 的 spring 集成中没有错误处理程序
<int-webflux:outbound-gateway
http-method="GET"
url-expression="url"
expected-response-type="java.lang.String"
header-mapper="headerMapper"
/>
这是我对 pom.xml 的依赖:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-webflux</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
解决方案
Spring Integration HTTP 模块完全基于RestTemplate
Spring Web。那个有提到ErrorHandler
它的同步请求。
Spring Integration WebFlux 模块完全基于 Spring WebFlux 基础的非阻塞WebClient
。内部逻辑基于 Project Reactor 类型,例如Flux
和Mono
。为了尊重反应流规范,WebFluxRequestExecutingMessageHandler
只返回一个Mono
for 响应。如果在与服务器交互期间出现一些错误,我们会在那里:
requestSpec.exchange()
.flatMap(response -> {
HttpStatus httpStatus = response.statusCode();
if (httpStatus.isError()) {
return response.body(BodyExtractors.toDataBuffers())
.reduce(DataBuffer::write)
.map(dataBuffer -> {
byte[] bytes = new byte[dataBuffer.readableByteCount()];
dataBuffer.read(bytes);
DataBufferUtils.release(dataBuffer);
return bytes;
})
.defaultIfEmpty(new byte[0])
.map(bodyBytes -> {
throw new WebClientResponseException(
"ClientResponse has erroneous status code: "
+ httpStatus.value() + " "
+ httpStatus.getReasonPhrase(),
httpStatus.value(),
httpStatus.getReasonPhrase(),
response.headers().asHttpHeaders(),
bodyBytes,
response.headers().contentType()
.map(MimeType::getCharset)
.orElse(StandardCharsets.ISO_8859_1));
}
);
}
else {
return Mono.just(response);
}
});
所以,有些WebClientResponseException
人会被扔进回复Mono
中。在任何反应式或非反应式下游,这样的异常将被处理如下:
protected void sendErrorMessage(Message<?> requestMessage, Throwable ex) {
Object errorChannel = resolveErrorChannel(requestMessage.getHeaders());
Throwable result = ex;
if (!(ex instanceof MessagingException)) {
result = new MessageHandlingException(requestMessage, ex);
}
if (errorChannel == null) {
logger.error("Async exception received and no 'errorChannel' header exists and no default "
+ "'errorChannel' found", result);
}
else {
try {
sendOutput(new ErrorMessage(result), errorChannel, true);
}
catch (Exception e) {
Exception exceptionToLog =
IntegrationUtils.wrapInHandlingExceptionIfNecessary(requestMessage,
() -> "failed to send error message in the [" + this + ']', e);
logger.error("Failed to send async reply", exceptionToLog);
}
}
}
从请求消息的标头中errorChannel
提取并回退到全局IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME
.
有了所有这些,您应该订阅这样的错误通道来
WebClientResponseException
分别处理这些实例。RestTemplate
在 Spring Framework 文档中查看更多信息: https ://docs.spring.io/spring/docs/current/spring-framework-reference/integration.html#rest-client-access
推荐阅读
- java - 无法使用 AWSIotMqttManager 和 Aws Cognito 凭证连接到 Aws Iot
- javascript - 如何在 Javascript 中实现这个 C++ 模式?
- gitlab - 有没有办法镜像 Gitlab 内置的错误跟踪内容?
- sql - 打印(选择)学生表中的学生ID,学生姓名,其中任何两门科目的分数大于90(共6门)
- walmart-api - 收据 API 可用性?
- c++ - 指针范围问题和指针在类中封装的指针向量内的返回引用
- javascript - 单击我网站上的 href 链接后如何访问其他页面的元素?
- python-3.x - Can I break values into buckets?
- php - 无法使用 PHP 在 Azure blob 容器上上传 wav 文件
- javascript - 状态与身份验证一起使用