spring - ConnectableFlux 中如何向订阅者广播消息
问题描述
我是 Spring WebFlux 的新手,我在此链接中找到了使用 Spring WebFlux 和 Websocket 的聊天演示:
https://blog.monkey.codes/how-to-build-a-chat-app-using-webflux-websockets-react/
根据文章,它说:
应用程序的关键是将 WebSocketSession 相互连接起来。这是通过将每个会话的传入消息流连接到全局发布者来实现的。另一方面,每个会话都订阅由全局发布者生成的消息。
反应堆配置:
@Bean
public UnicastProcessor<Event> eventPublisher(){
return UnicastProcessor.create();
}
@Bean
public Flux<Event> events(UnicastProcessor<Event> eventPublisher) {
return eventPublisher
.replay(25)
.autoConnect();
}
@Bean
public HandlerMapping webSocketMapping(UnicastProcessor<Event> eventPublisher, Flux<Event> events) {
Map<String, Object> map = new HashMap<>();
map.put("/websocket/chat", new ChatSocketHandler(eventPublisher, events));
SimpleUrlHandlerMapping simpleUrlHandlerMapping = new SimpleUrlHandlerMapping();
simpleUrlHandlerMapping.setUrlMap(map);
//Without the order things break :-/
simpleUrlHandlerMapping.setOrder(10);
return simpleUrlHandlerMapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
聊天套接字处理程序:
@Override
public Mono<Void> handle(WebSocketSession session) {
WebSocketMessageSubscriber subscriber = new WebSocketMessageSubscriber(eventPublisher);
session.receive()
.map(WebSocketMessage::getPayloadAsText)
.map(this::toEvent)
.subscribe(subscriber::onNext, subscriber::onError, subscriber::onComplete);
return session.send(outputEvents.map(session::textMessage));
}
private static class WebSocketMessageSubscriber {
private UnicastProcessor<Event> eventPublisher;
private Optional<Event> lastReceivedEvent = Optional.empty();
public WebSocketMessageSubscriber(UnicastProcessor<Event> eventPublisher) {
this.eventPublisher = eventPublisher;
}
public void onNext(Event event) {
lastReceivedEvent = Optional.of(event);
eventPublisher.onNext(event);
}
public void onError(Throwable error) {
//TODO log error
error.printStackTrace();
}
public void onComplete() {
lastReceivedEvent.ifPresent(event -> eventPublisher.onNext(
Event.type(USER_LEFT)
.withPayload()
.user(event.getUser())
.build()));
}
}
如果一个用户发送一条消息,那么其他用户可以看到这一点,因为它已广播给所有用户。但是当我阅读代码时,我不明白 ConnectableFlux(事件)如何将消息发送到所有 Web 套接字会话。我还尝试评论 UserStats 类的代码,但它仍然可以正常工作。谁能给我解释一下?
解决方案
通过 Websocket 从服务器向浏览器发送消息的代码是:
session.send(outputEvents.map(session::textMessage))
换句话说,每当一个事件(消息)被推送到 outputEvents 通量上时,它将通过链接到会话的 Websocket 发送到浏览器。
它像广播一样起作用的原因是,为每个新的 WebSocketSession 创建的每个 WebSocketMessageSubscriber 都会将它接收到的所有内容(从浏览器到服务器)发布到 eventPublisher 上,这使得它在 outputEvents Flux 上可见。
public void onNext(Event event) {
lastReceivedEvent = Optional.of(event);
eventPublisher.onNext(event);
}
outputEvents Flux 在所有 WebSocketMessageSubscribers 之间共享,并且通过此 Flux 接收到的所有事件都通过 Websocket 发送回浏览器,包括它从该 Websocket 接收到的事件。
下图中的“全局消息流”代表 outputEvents Flux。
推荐阅读
- python - 使用循环抓取多个网站
- javascript - 使用 getEntry() 帮助程序到达 Contentful 的 API 时出现问题:未从已找到条目的承诺中返回任何内容
- mongodb - 将 CSV 文件导入 MongoDB 时如何防止更改 UUID?
- javascript - 通过复选框更新Javascript进度条?
- sql - 在最后一个斜线后获得 5 个字符
- javascript - 在嵌套的 Promise 中捕获所有块
- docker - Docker 灯重定向 https
- excel - 在ms excel中计算整行中连续值序列的数量
- excel - 如何使用来自不同列的值在 Excel 的 XY 散点图中添加不同的标记?
- typescript - 如何使用 Jest 和 TypeScript 正确存根/模拟 AWS SecretManager