spring-boot - 在 Spring 反应式 WebSocketClient 中阻止超时异常
问题描述
websocket 示例项目基于 Spring 5.3(以及由 Boot 2.5.6 管理的依赖项)。
在服务器端,我创建了一个WebSocketHandlder
来处理 WebSocket 请求。
@RequiredArgsConstructor
public class MessagetHandler implements WebSocketHandler {
private final ObjectMapper objectMapper;
private Sinks.Many<Message> sinks = Sinks.many().replay().limit(2);
private Flux<Message> outputMessages = sinks.asFlux();
@Override
public Mono<Void> handle(WebSocketSession session) {
var receiveMono = session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::readIncomingMessage)
.map(req -> Message.builder().id(UUID.randomUUID()).body(req.message()).sentAt(LocalDateTime.now()).build())
.log("server receiving::")
// .subscribe(
// data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST),
// error -> sinks.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST)
// );
.doOnNext(data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST))
.doOnError(error -> sinks.emitError(error, Sinks.EmitFailureHandler.FAIL_FAST))
.then();
// TODO: workaround for suspected RxNetty WebSocket client issue
// https://github.com/ReactiveX/RxNetty/issues/560
var sendMono = session
.send(
Mono.delay(Duration.ofMillis(500))
.thenMany(outputMessages.map(msg -> session.textMessage(toJsonString(msg))))
)
.log("server sending::")
.onErrorResume(throwable -> session.close())
.then();
return Mono.zip(receiveMono, sendMono).then();
}
//...
}
我写了一个测试来通过 WebSocketClient() 发送 WebSocket 请求NettReactiveWebSocketClient
。
Sinks.Many<Message> sinks = Sinks.many().replay().limit(2);
// var latch = new CountDownLatch(1);
var socketUri = URI.create("ws://localhost:" + port + "/ws/messages");
WebSocketHandler handler = session -> {
var receiveMono = session.receive()
.map(it -> it.getPayloadAsText())
.map(text -> readMessage(text))
.log("client receiving::")
.doOnNext(data -> sinks.emitNext(data, Sinks.EmitFailureHandler.FAIL_FAST)).then();
//.subscribe(data -> replayList.add(data));
var sendMono = session
.send(
Mono.delay(Duration.ofMillis(100)).thenMany(
Flux.just("message one", "message two").map(m -> session.textMessage(toJsonString(m)))
)
)
.log("client sending::")
//.then(Mono.delay(Duration.ofMillis(1000)).then(session.close(CloseStatus.NORMAL)))
.then();
return sendMono.then(receiveMono);
};
this.client.execute(socketUri, handler)
// .block(Duration.ofMillis(1000));
.subscribe();
// latch.await();
// assertThat(replayList.size()).isEqualTo(2);
StepVerifier.create(sinks.asFlux())
.consumeNextWith(it -> assertThat(it.getBody()).isEqualTo("message one"))
.consumeNextWith(it -> assertThat(it.getBody()).isEqualTo("message two"))
.expectComplete()
.verify();
如果我this.client.execute(socketUri, handler).subscribe()
在这里使用,它总是被阻止并且没有退出控制台。
如果我用过 ,它会抛出一个抱怨阻塞超时this.client.execute(socketUri, handler).block(Duratoin.of...)
的异常。但是我在以前的示例项目中使用了这种方法,它适用于两种情况(使用或),检查hantsy/angular-spring-websocket-sample,这个示例项目基于 Spring Boot 2.4。subscribe
block
更新:如果我Sinks
用 a替换List
来记录消息,它就可以工作。我在旧的 Spring Boot 2.3、2.4 项目中使用了 legacy ReplayProcesssor
(Reactor 建议更新到),效果很好。Sinks
为什么要Sinks
阻止我的WebSocketClient
执行?
解决方案
推荐阅读
- php - Phalcon 4 文档控制器处理程序问题
- css - CSS :not() 如果它是具有 class="foo" 的容器的子元素,我如何不选择任何元素?
- kotlin - 如何使用 Kaspresso 框架在移动模拟器上执行滚动功能以实现测试自动化?
- dpkg-buildpackage - dpkg-buildpackage 命令不适用于 -P 参数
- entity-framework - 使用(托管)后台服务时的 .NET Core / EF Core (SQL) Max Pool 问题
- c# - Npgsql 4.0 没有明确返回字典参数
- r - 对多个变量使用 dplyr 和 ggplot
- python - scipy 行搜索的问题
- visual-c++ - Visual Studio 2017 nmake 缺少 io.h
- single-page-application - OfficeJS Excel 集成中的 OidcClient 错误