首页 > 解决方案 > 使用 Reactive Spring 和 WebFlux 将数据推送到 websocket

问题描述

我正在尝试将数据发送到 JS 客户端,但由于某种原因,没有任何.textMessage方法有效,而且看起来用户只是断开了连接,我return Mono.zip(inputMessage, outputMessage).then();从反应文档中得到了。基本思想是在 redis 上发布某些内容时发送消息,但我什至无法从 WebSocket 正确连接和接收消息。

    @Override
    public Mono<Void> handle(WebSocketSession webSocketSession) {
        HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
        final String proxyIp = headers.getFirst("x-forwarded-for");
        final String remoteIp = headers.getFirst("x-real-ip");
        final String ip = (proxyIp == null || proxyIp.isEmpty()) ? remoteIp : proxyIp;
        String rawCookieHeader = headers.getFirst("cookie");
        if (rawCookieHeader == null) {
            webSocketSession.textMessage("Missing cookie header");
            // return webSocketSession.close(CloseStatus.PROTOCOL_ERROR);
        }

        final Map<String, String> cookieMap = new HashMap<>();
        CookieParser parser = new CookieParser(rawCookieHeader);
        parser.iterator().forEachRemaining(cookie -> cookieMap.put(cookie.getName(), cookie.getValue()));

        String userId = cookieMap.getOrDefault("userId", "");

        // Handle too many sessions open
        RedisAtomicInteger numberOfOpenWebSockets = getUserWebsocket(userId, chatCode);
        if (numberOfOpenWebSockets.get() >= 3) {
            webSocketSession.textMessage("Too many websocket connections opened by this account");
            return webSocketSession.close(CloseStatus.SERVICE_OVERLOAD);
        }

        Mono<Void> outputMessage = webSocketSession
                .send(messageDirectProcessor.flatMap(JsonConverter::toJson).map(webSocketSession::textMessage)
                        .doOnError(e -> log.info("Error Occurred while sending message to client.", e)));

        // When a user sends a message
        Mono<Void> inputMessage = webSocketSession.receive()
                .flatMap(msg -> redisGlobalPublisher.processMessage(msg.getPayloadAsText(), userId, ip, sessionId))
                .doOnSubscribe(subscription -> {
                    log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
                            numberOfOpenWebSockets.incrementAndGet(), activeUserCounter.incrementAndGet());
                }).doOnError(throwable -> log.info("Error Occurred while sending message to Redis.", throwable))
                .doFinally(signalType -> {
                    log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
                            numberOfOpenWebSockets.decrementAndGet(), activeUserCounter.decrementAndGet());
                }).then();

        return Mono.zip(inputMessage, outputMessage).then();
    }

客户端超级简单:

        const ws = new WebSocket("ws://localhost:8082/ws")
        ws.onopen = (e) => {
          console.log(e);
          //@ts-ignore
          this.connectedToWebSocket = true;
          // console.log(JSON.stringify({"command": "subscribe","identifier":`{\"channel\":\"MESSAGE-${cookies.get('userId')}\"}`}));
          // ws.send(JSON.stringify({"command": "subscribe","identifier":`{\"channel\":\"MESSAGE-${cookies.get('userId')}\"}`}))
          console.log("Successfully connected to the echo websocket server...")
        };
        ws.onmessage = (event) => {
          console.log(event);
        }
        ws.onclose = (e) => {
          console.log(e);
        };
        ws.onerror = (e) => {
          console.log(e);
        }

我完全不知道为什么 input 和 outputMessage 没有被正确推送。任何帮助表示赞赏。

标签: javawebsocketspring-webfluxproject-reactor

解决方案


推荐阅读