spring-boot - springboot2 +webflux + websocket
问题描述
我在 JDK 11 上使用带有 Webflux 的 Spring boot 2。我编写了以下配置类:
@Configuration
public class WebSocketConfiguration {
@Autowired
@Bean
public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
final Map<String, WebSocketHandler> map = new HashMap<>();
map.put("/echo", server);
final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
mapping.setUrlMap(map);
return mapping;
}
@Bean
public WebSocketHandlerAdapter handlerAdapter() {
return new WebSocketHandlerAdapter();
}
}
以及以下WebSocketHandler
方法:
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
return webSocketSession.send(webSocketSession.receive().
map(msg -> webSocketSession
.textMessage("response:jack ->" + msg.getPayloadAsText())));
}
现在,我可以接收我发送的任何内容,例如:
客户端发送:4545
客户端接收 :response:jack ->4545
我想知道当客户端不给我发消息的时候,如何给客户端推送消息,我需要随时推送消息!</p>
如何随时发送自定义消息而不是使用相同的输入消息进行响应?
解决方案
您可以在我的博客文章http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/中了解它。
您需要将您的更改WebSocketHandler
为:
private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;
public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
this.publisher = Flux.create(greetingsPublisher).share();
}
@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
final Flux<WebSocketMessage> message = publisher
.map(greetings -> webSocketSession.textMessage(greetings));
return webSocketSession.send(message);
}
并添加GreetingPublisher
@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
private final Executor executor = Executors.newSingleThreadExecutor();
public boolean push(String greeting) {
return queue.offer(greeting);
}
@Override
public void accept(FluxSink<String> sink) {
this.executor.execute(() -> {
while (true) {
Try.of(() -> {
final String greeting = queue.take();
return sink.next(greeting);
})
.onFailure(ex -> log.error("Could not take greeting from queue", ex));
}
});
}
}
它是一个 bean,所以无论你注入它并调用 push 方法,它都会使用 WebSocket 发送消息。例如:
@Controller
public class GreetingsController {
private final GreetingsPublisher greetingsPublisher;
public GreetingsController(GreetingsPublisher greetingsPublisher) {
this.greetingsPublisher = greetingsPublisher;
}
@Bean
RouterFunction<ServerResponse> pushMessage() {
return route(GET("/push"),
request -> {
greetingsPublisher.push("Send a new message with WebSocket");
return ServerResponse.ok().body(fromObject("websocket message sent"));
});
}
}
首先连接 WebSocket,然后在 localhost:8080/push 上打开浏览器。应该发送消息。
请注意,这似乎是 Spring Boot 2.1.7 的一个错误,我在我的博客文章中提到了它。
推荐阅读
- python - Python Selenium 动态链接错误信息
- c# - 使用 json 补丁找不到路径段指定的目标位置
- c - 从文件中读取邻接矩阵 - 竞争性编程
- widget - 在 GridLayout 列中对齐小部件并使所有小部件在 kivy 中的宽度相同大小时出现问题,有没有办法解决这个问题?
- python - 如何使用 shamirs 秘密共享存储和拆分共享 aes 密钥和 iv?
- sql - 如何在 django 中单独获取特定文件?
- plsql - 在此上下文中不允许出现错误“表、视图或序列引用“EMPLOYEES.EMP_NAME””
- grafana - 自定义“Istio Grafana Dashboard JSON Document”以创建警报
- amazon-web-services - 在 DynamoDB GlobalTable CloudFormation 模板中添加标签
- cmake - 如何查看 cmake 的 add_custom_command 的输出