首页 > 解决方案 > spring-integration: How to create a Spring Reactor Flux from WebFlux integration flow?

问题描述

In How to create a Spring Reactor Flux from Http integration flow? artem-bilan mentioned in a comment that it will be possible to use the webflux integration in the future.

Since the time when the comment was written, the WebFlux integration has been factored out to spring-integration-webflux. I have tried the following to replicate the MVC based http->flux integration with a WebFlux based one by replacing the Http.inboundChannelAdapter and the @GetRequest handler of the MVC version with a WebFlux.inboundChannelAdapter and WebFlux.inboundGateway:

@SpringBootApplication
public class WebfluxApplication {

  public static void main(String[] args) {
    SpringApplication.run(WebfluxApplication.class, args);
  }


  @Bean
  public Publisher<Message<String>> reactiveSource() {
    return IntegrationFlows.
            from(WebFlux.inboundChannelAdapter("/message/{id}")
                    .requestMapping(r -> r
                            .methods(HttpMethod.POST)
                    )
                    .payloadExpression("#pathVariables.id")
            )
            .log()
            .channel(MessageChannels.flux())
            .toReactivePublisher();
  }


  @Bean
  public IntegrationFlow eventMessages() {
    return IntegrationFlows
            .from(WebFlux.inboundGateway("/events")
                    .requestMapping(m -> m.produces(MediaType.TEXT_EVENT_STREAM_VALUE)))
            .handle((p, h) -> reactiveSource())                
            .get();
}

}

It appears that the flow in the reactiveSource() publisher does not receive any messages, at least nothing is logged for my .log() statement.

When I replace the reactiveSource() publisher in the eventMessages flow

.handle((p, h) -> reactiveSource()) 

by a fake publisher

.handle((p, h) -> Flux.just("foo", "bar"))

I get SSE responses from

curl localhost:8080/events

The trace log shows that the reactiveSource() POST handler is mapped and the WebFluxInboundEndpoint.handle method is being invoked:

2018-05-05 16:50:58.788  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/message/{id}],methods=[POST]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:58.789  INFO 6552 --- [           main] xIntegrationRequestMappingHandlerMapping : Mapped "{[/events],methods=[GET || POST],produces=[text/event-stream]}" onto public abstract reactor.core.publisher.Mono<java.lang.Void> org.springframework.web.server.WebHandler.handle(org.springframework.web.server.ServerWebExchange)
2018-05-05 16:50:59.191  INFO 6552 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:8080
2018-05-05 16:50:59.192  INFO 6552 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 8080
2018-05-05 16:50:59.196  INFO 6552 --- [           main] d.e.sample.webflux.WebfluxApplication    : Started WebfluxApplication in 2.608 seconds (JVM running for 3.419)
2018-05-05 16:51:06.918 DEBUG 6552 --- [ctor-http-nio-2] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:06.932 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:06.933 DEBUG 6552 --- [ctor-http-nio-2] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@775cdb20]
2018-05-05 16:51:06.967 TRACE 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:06.967 DEBUG 6552 --- [ctor-http-nio-2] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] o.s.web.reactive.DispatcherHandler       : Processing POST request for [http://localhost:8080/message/4]
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Looking up handler method for path /message/4
2018-05-05 16:51:11.363 DEBUG 6552 --- [ctor-http-nio-3] s.w.r.r.m.a.RequestMappingHandlerMapping : Did not find handler method for [/message/4]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Invoking 'org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle' with arguments [org.springframework.web.server.adapter.DefaultServerWebExchange@71f648a3]
2018-05-05 16:51:11.364 TRACE 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 16:51:11.364 DEBUG 6552 --- [ctor-http-nio-3] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method

Why is that?

标签: spring-integrationspring-webflux

解决方案


The reason appears to be that WebFluxInboundEndpoint stops processing POST requests without body in doHandle(), the line

.map(body -> new HttpEntity<>(...)) 

is never executed if there is no request body content:

private Mono<Void> doHandle(ServerWebExchange exchange) {
    return extractRequestBody(exchange)
            .doOnSubscribe(s -> this.activeCount.incrementAndGet())
            .map(body -> new HttpEntity<>(body, exchange.getRequest().getHeaders()))
            .map(entity -> buildMessage(entity, exchange))
            .flatMap(requestMessage -> {
                if (this.expectReply) {
                    return sendAndReceiveMessageReactive(requestMessage)
                            .flatMap(replyMessage -> populateResponse(exchange, replyMessage));
                }
                else {
                    send(requestMessage);
                    return setStatusCode(exchange);
                }
            })
            .doOnTerminate(this.activeCount::decrementAndGet);

}

Workaround: the caller must send any non-empty request body to make it work, e.g. a single quote passed with -d is sufficient:

curl -d ' http://localhost:8080/message/4

With such a request, my log contains the incoming GenericMessage as expected and the /events resource starts producing SSE.

2018-05-05 17:25:24.777 TRACE 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Method [org.springframework.integration.webflux.inbound.WebFluxInboundEndpoint.handle] returned [MonoDefer]
2018-05-05 17:25:24.777 DEBUG 40436 --- [ctor-http-nio-8] o.s.w.r.r.method.InvocableHandlerMethod  : Response fully handled in controller method
2018-05-05 17:25:24.778  INFO 40436 --- [ctor-http-nio-8] o.s.integration.handler.LoggingHandler   : GenericMessage [payload=4, headers={http_requestMethod=POST, Accept=*/*, User-Agent=curl/7.49.1, http_requestUrl=http://localhost:8080/message/4, Host=localhost:8080, id=9a09294d-280a-af3b-0894-23597cf1cb5f, Content-Length=1, contentType=application/x-www-form-urlencoded, timestamp=1525533924778}]

推荐阅读