首页 > 解决方案 > Spring Integration - 将参数和流从 WebFlux 入站网关传递到 WebFlux 出站网关

问题描述

我正在尝试使用与 @Bean 定义混合的 DSL 样式流来设置 Spring 集成流。在此示例中,我尝试将传入的 REST 请求 (restCustomerGateway) 作为入站 webflux 网关进行调解。我看到您可以使用 .payloadExpression 将内容从请求中提取出来(在这种情况下,id 路径参数......如果有更好或更多类型安全的方法会很有趣)。

然后,我将其流入 webflex 出站网关 (restCustomerSource) 以进行下游调用,然后将其作为响应返回到入站网关。请注意,最终会有变压器来进行有效载荷转换/等。

简单的问题是,我如何构建它以便我可以访问“id”(路径参数,当前在出站网关调用中硬编码为“1”)?我假设这是在两者之间流动的消息有效负载的一部分,但我该如何处理它呢?

@Bean
public WebFluxInboundEndpoint restCustomerGateway() {
    return WebFlux.inboundGateway("/rest/customers/{id}")
        .requestMapping(m -> m.produces(MediaType.APPLICATION_JSON_VALUE)).payloadExpression("#pathVariables.id")
        .get();
}

@Bean
public WebFluxRequestExecutingMessageHandler restCustomerSource() {
    return WebFlux.outboundGateway("http://localhost:8080/customers/1").httpMethod(HttpMethod.GET)
        .expectedResponseType(Customer.class)
        .get();
}

@Bean
public IntegrationFlow restCustomerFlow(CustomerProcessor customerProcessor) {
    return IntegrationFlows
        .from(restCustomerGateway())
        .handle(restCustomerSource())
        .handle(customerProcessor)
        .get();
}

标签: spring-integration

解决方案


有一个

/**
 * Specify a {@link Function} to evaluate in order to generate the Message payload.
 * @param payloadFunction The payload {@link Function}.
 * @param <P> the expected HTTP request body type.
 * @return the spec
 * @see HttpRequestHandlingEndpointSupport#setPayloadExpression(Expression)
 */
public <P> S payloadFunction(Function<HttpEntity<P>, ?> payloadFunction) {

在 上WebFluxInboundEndpointSpec,但您无权访问评估上下文变量,甚至无法访问原始ServerWebExchange,函数中只有RequestEntity可用。

由于您将该id路径变量存储到消息的有效负载中以通过 向下游推送payloadExpression("#pathVariables.id"),因此它确实可以在WebFlux.outboundGateway()for 访问中使用。

你现在有 hard-coded uri,但你可以使用这个变体:

/**
 * Create an {@link WebFluxMessageHandlerSpec} builder for request-reply gateway
 * based on provided {@code Function} to evaluate target {@code uri} against request message.
 * @param uriFunction the {@code Function} to evaluate {@code uri} at runtime.
 * @param <P> the expected payload type.
 * @return the WebFluxMessageHandlerSpec instance
 */
public static <P> WebFluxMessageHandlerSpec outboundGateway(Function<Message<P>, ?> uriFunction) {

所以,因此你的配置变成这样:

WebFlux.outboundGateway(m -> "http://localhost:8080/customers/" + m.getPayload())

推荐阅读