首页 > 解决方案 > 在 Spring 反应式 WebSocketClient 中阻止超时异常

问题描述

websocket 示例项目基于 Spring 5.3(以及由 Boot 2.5.6 管理的依赖项)。

在服务器端,我创建了一个WebSocketHandlder来处理 WebSocket 请求。

@RequiredArgsConstructor
public class MessagetHandler implements WebSocketHandler {

    private final ObjectMapper objectMapper;

    private Sinks.Many<Message> sinks = Sinks.many().replay().limit(2);
    private Flux<Message> outputMessages = sinks.asFlux();

    @Override
    public Mono<Void> handle(WebSocketSession session) {
        var receiveMono = session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .map(this::readIncomingMessage)
                .map(req -> Message.builder().id(UUID.randomUUID()).body(req.message()).sentAt(LocalDateTime.now()).build())
                .log("server receiving::")
//                .subscribe(
//                        data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST),
//                        error -> sinks.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST)
//                );
                .doOnNext(data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST))
                .doOnError(error -> sinks.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST))
                .then();

        // TODO: workaround for suspected RxNetty WebSocket client issue
        // https://github.com/ReactiveX/RxNetty/issues/560
        var sendMono = session
                .send(
                        Mono.delay(Duration.ofMillis(500))
                                .thenMany(outputMessages.map(msg -> session.textMessage(toJsonString(msg))))
                )
                .log("server sending::")
                .onErrorResume(throwable -> session.close())
                .then();

        return Mono.zip(receiveMono, sendMono).then();
    }
//...
}

我写了一个测试来通过 WebSocketClient() 发送 WebSocket 请求NettReactiveWebSocketClient

 Sinks.Many<Message> sinks = Sinks.many().replay().limit(2);
        // var latch = new CountDownLatch(1);

        var socketUri = URI.create("ws://localhost:" + port + "/ws/messages");

        WebSocketHandler handler = session -> {

            var receiveMono = session.receive()
                    .map(it -> it.getPayloadAsText())
                    .map(text -> readMessage(text))
                    .log("client receiving::")
                    .doOnNext(data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST)).then();
            //.subscribe(data -> replayList.add(data));

            var sendMono = session
                    .send(
                            Mono.delay(Duration.ofMillis(100)).thenMany(
                                    Flux.just("message one", "message two").map(m -> session.textMessage(toJsonString(m)))
                            )
                    )
                    .log("client sending::")
                    //.then(Mono.delay(Duration.ofMillis(1000)).then(session.close(CloseStatus.NORMAL)))
                    .then();
            return sendMono.then(receiveMono);
        };

        this.client.execute(socketUri, handler)
                 //       .block(Duration.ofMillis(1000));
                .subscribe();

        // latch.await();
        // assertThat(replayList.size()).isEqualTo(2);

        StepVerifier.create(sinks.asFlux())
                .consumeNextWith(it -> assertThat(it.getBody()).isEqualTo("message one"))
                .consumeNextWith(it -> assertThat(it.getBody()).isEqualTo("message two"))
                .expectComplete()
                .verify();

如果我this.client.execute(socketUri, handler).subscribe()在这里使用,它总是被阻止并且没有退出控制台

如果我用过 ,它会抛出一个抱怨阻塞超时this.client.execute(socketUri, handler).block(Duratoin.of...)的异常。但是我在以前的示例项目中使用了这种方法,它适用于两种情况(使用或),检查hantsy/angular-spring-websocket-sample,这个示例项目基于 Spring Boot 2.4。subscribeblock

更新:如果我Sinks用 a替换List来记录消息,它就可以工作。我在旧的 Spring Boot 2.3、2.4 项目中使用了 legacy ReplayProcesssor(Reactor 建议更新到),效果很好。Sinks为什么要Sinks阻止我的WebSocketClient执行?

标签: spring-bootspring-webfluxspring-websocket

解决方案


推荐阅读