首页 > 解决方案 > Spring Boot Reactive Websoket - 阻止流量,直到收到来自客户端的所有信息

问题描述

我正在使用响应式 websocket(在 spring boot 2.1.0 上),但我在尝试阻止等待客户端信息的输出通量时遇到问题。

我知道阻塞不是处理反应流的正确方法,但我需要在继续之前从客户端接收一些信息(即:身份验证密钥、ID),有一种可接受的方法来以反应方式管理它吗?

例如:我希望客户端发送授权密钥和订阅 ID(仅订阅特定事件),并且仅当我拥有这两个信息时才发送输出通量,或者如果信息无效则关闭会话

我试图管理句柄方法中的检查

     webSocketSession.receive().subscribe(inMsg -> {

            if(!inMsg.getPayloadAsText().equals("test-key")) {
                log.info("AUTHORIZATION ERROR");
                webSocketSession.close();
            }
        });

但是这种方式不起作用并且不正确,因为它是以“异步”方式管理会话终止,无论如何,当接收到带有错误密钥的消息时,会话仍然保持活动状态

另一种方法是使用内存存储来保持“会话”,以跟踪接收到的信息并在业务逻辑级别进行处理

我一直在寻找一种正确的方法来以被动的方式管理它

我的出发点是这个例子:http ://www.baeldung.com/spring-5-reactive-websockets

提前致谢

附加信息:

使用示例https://github.com/eugenp/tutorials/tree/master/spring-5-reactive/src/main/java/com/baeldung/reactive/websocket

我创建了一个基本的 Spring Boot 应用程序,并为 websocket 添加了一个配置类:

@Configuration 
class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", new MyWebSocketHandler());

        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        mapping.setOrder(-1); // before annotated controllers
        return mapping;
    }

    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

以下是包含主要 websocket 方法的主类(用我的实际代码修改):

@Component
public class MyWebSocketHandler  implements WebSocketHandler {

    @Autowired
    private WebSocketHandler webSocketHandler;

    @Bean
    public HandlerMapping webSocketHandlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/event-emitter-test", webSocketHandler);

        SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping();
        handlerMapping.setOrder(1);
        handlerMapping.setUrlMap(map);
        return handlerMapping;
    }

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {

       webSocketSession.receive().subscribe(inMsg -> {

        if(!inMsg.getPayloadAsText().equals("test-key")) {
            // log.info("AUTHORIZATION ERROR");
            webSocketSession.close();
        }
    });

    List<String> data = new ArrayList<String>(Arrays.asList("{A}", "{B}", "{C}"));
    Flux<String> intervalFlux = Flux
                                .interval(Duration.ofMillis(500))
                                .map(tick -> {
                                    if (tick < data.size())
                                        return "item " + tick + ": " + data.get(tick.intValue());
                                    return "Done (tick == data.size())";
                                });

        return webSocketSession.send(intervalFlux
          .map(webSocketSession::textMessage));
    }


}

标签: spring-bootwebsocketreactive-programmingspring-webflux

解决方案


您不应该subscribe也不应该block在反应式管道中 - 您可以发现您在这样的管道中,因为处理程序方法的返回类型是 a Mono<Void>,这意味着传入消息处理完成的信号。

在您的情况下,您可能想要阅读第一条消息,检查它是否包含您期望的订阅信息,然后发送消息。

public class TestWebSocketHandler implements WebSocketHandler {

    public Mono<Void> handle(WebSocketSession session) {

        return session.receive()
                .map(WebSocketMessage::getPayloadAsText)
                .flatMap(msg -> {
                    SubscriptionInfo info = extract(msg);
                    if (info == null) {
                        return session.close();
                    }
                    else {
                        Mono<WebSocketMessage> message = Mono.just(session.textMessage("message"));
                        return session.send(message);
                    }
                })
                .then();
    }

    SubscriptionInfo extract(String message) {
        //
    }

    class SubscriptionInfo {
        //
    }
}

推荐阅读