java - 使用 Reactive Spring 和 WebFlux 将数据推送到 websocket
问题描述
我正在尝试将数据发送到 JS 客户端,但由于某种原因,没有任何.textMessage
方法有效,而且看起来用户只是断开了连接,我return Mono.zip(inputMessage, outputMessage).then();
从反应文档中得到了。基本思想是在 redis 上发布某些内容时发送消息,但我什至无法从 WebSocket 正确连接和接收消息。
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
HttpHeaders headers = webSocketSession.getHandshakeInfo().getHeaders();
final String proxyIp = headers.getFirst("x-forwarded-for");
final String remoteIp = headers.getFirst("x-real-ip");
final String ip = (proxyIp == null || proxyIp.isEmpty()) ? remoteIp : proxyIp;
String rawCookieHeader = headers.getFirst("cookie");
if (rawCookieHeader == null) {
webSocketSession.textMessage("Missing cookie header");
// return webSocketSession.close(CloseStatus.PROTOCOL_ERROR);
}
final Map<String, String> cookieMap = new HashMap<>();
CookieParser parser = new CookieParser(rawCookieHeader);
parser.iterator().forEachRemaining(cookie -> cookieMap.put(cookie.getName(), cookie.getValue()));
String userId = cookieMap.getOrDefault("userId", "");
// Handle too many sessions open
RedisAtomicInteger numberOfOpenWebSockets = getUserWebsocket(userId, chatCode);
if (numberOfOpenWebSockets.get() >= 3) {
webSocketSession.textMessage("Too many websocket connections opened by this account");
return webSocketSession.close(CloseStatus.SERVICE_OVERLOAD);
}
Mono<Void> outputMessage = webSocketSession
.send(messageDirectProcessor.flatMap(JsonConverter::toJson).map(webSocketSession::textMessage)
.doOnError(e -> log.info("Error Occurred while sending message to client.", e)));
// When a user sends a message
Mono<Void> inputMessage = webSocketSession.receive()
.flatMap(msg -> redisGlobalPublisher.processMessage(msg.getPayloadAsText(), userId, ip, sessionId))
.doOnSubscribe(subscription -> {
log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
numberOfOpenWebSockets.incrementAndGet(), activeUserCounter.incrementAndGet());
}).doOnError(throwable -> log.info("Error Occurred while sending message to Redis.", throwable))
.doFinally(signalType -> {
log.info("User '{}' Disconnected. Connections: {}, Active Users: {}", userId,
numberOfOpenWebSockets.decrementAndGet(), activeUserCounter.decrementAndGet());
}).then();
return Mono.zip(inputMessage, outputMessage).then();
}
客户端超级简单:
const ws = new WebSocket("ws://localhost:8082/ws")
ws.onopen = (e) => {
console.log(e);
//@ts-ignore
this.connectedToWebSocket = true;
// console.log(JSON.stringify({"command": "subscribe","identifier":`{\"channel\":\"MESSAGE-${cookies.get('userId')}\"}`}));
// ws.send(JSON.stringify({"command": "subscribe","identifier":`{\"channel\":\"MESSAGE-${cookies.get('userId')}\"}`}))
console.log("Successfully connected to the echo websocket server...")
};
ws.onmessage = (event) => {
console.log(event);
}
ws.onclose = (e) => {
console.log(e);
};
ws.onerror = (e) => {
console.log(e);
}
我完全不知道为什么 input 和 outputMessage 没有被正确推送。任何帮助表示赞赏。
解决方案
推荐阅读
- powershell - 具有多个路径的 Get-ChildItem - 如果目录丢失,则会出错
- c# - C# stream Deserialize way to loop?
- python - Python TypeError: list indices must be integers or slices, not tuple. Caesar Cypher Decoder
- jquery - AngularDatatables: How to check all checkbox records in current page
- .htaccess - 如何重定向 URL,然后使用 htaccess 将其重写为相同的 url?
- mongodb - In MongoDB, how to update all the documents by increasing a value
- scala - Spark accumulator used in transformation in ML lib
- asp.net-core-3.1 - HTTP 错误 500.21 - 内部服务器错误处理程序“aspNetCore”的模块列表中有一个错误模块“ManagedPipelineHandler”
- firebase - (Flutter) 如何使用 Cloud Functions for Firebase 从 Firestore 检索文档
- python - 如何将 datetime.datetime 对象转换为浮点数?