首页 > 解决方案 > springboot2 +webflux + websocket

问题描述

我在 JDK 11 上使用带有 Webflux 的 Spring boot 2。我编写了以下配置类:

@Configuration
public class WebSocketConfiguration {

    @Autowired
    @Bean
    public HandlerMapping webSocketMapping(final MyWebSocketHandler server) {
        final Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", server);

        final SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setOrder(Ordered.HIGHEST_PRECEDENCE);
        mapping.setUrlMap(map);
        return mapping;
    }
    @Bean
    public WebSocketHandlerAdapter handlerAdapter() {
        return new WebSocketHandlerAdapter();
    }
}

以及以下WebSocketHandler方法:

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    return webSocketSession.send(webSocketSession.receive().
            map(msg -> webSocketSession
                    .textMessage("response:jack ->" + msg.getPayloadAsText())));
}

现在,我可以接收我发送的任何内容,例如:

客户端发送:4545

客户端接收 :response:jack ->4545

我想知道当客户端不给我发消息的时候,如何给客户端推送消息,我需要随时推送消息!</p>

如何随时发送自定义消息而不是使用相同的输入消息进行响应?

标签: spring-bootwebsocketspring-webflux

解决方案


您可以在我的博客文章http://kojotdev.com/2019/08/spring-webflux-websocket-with-vue-js/中了解它。

您需要将您的更改WebSocketHandler为:

private final GreetingsPublisher greetingsPublisher;
private final Flux<String> publisher;

public ReactiveWebSocketHandler(GreetingsPublisher greetingsPublisher) {
    this.greetingsPublisher = greetingsPublisher;
    this.publisher = Flux.create(greetingsPublisher).share();
}

@Override
public Mono<Void> handle(WebSocketSession webSocketSession) {
    final Flux<WebSocketMessage> message = publisher
            .map(greetings -> webSocketSession.textMessage(greetings));

    return webSocketSession.send(message);
}

并添加GreetingPublisher

@Component
public class GreetingsPublisher implements Consumer<FluxSink<String>> {
    private static final Logger log = LoggerFactory.getLogger(GreetingsPublisher.class);

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
    private final Executor executor = Executors.newSingleThreadExecutor();

    public boolean push(String greeting) {
        return queue.offer(greeting);
    }

    @Override
    public void accept(FluxSink<String> sink) {
        this.executor.execute(() -> {
            while (true) {
                Try.of(() -> {
                    final String greeting = queue.take();
                    return sink.next(greeting);
                })
                        .onFailure(ex -> log.error("Could not take greeting from queue", ex));

            }
        });
    }
}

它是一个 bean,所以无论你注入它并调用 push 方法,它都会使用 WebSocket 发送消息。例如:

@Controller
public class GreetingsController {

    private final GreetingsPublisher greetingsPublisher;

    public GreetingsController(GreetingsPublisher greetingsPublisher) {
        this.greetingsPublisher = greetingsPublisher;
    }

    @Bean
    RouterFunction<ServerResponse> pushMessage() {
        return route(GET("/push"),
                request -> {
                    greetingsPublisher.push("Send a new message with WebSocket");
                    return ServerResponse.ok().body(fromObject("websocket message sent"));
                });
    }
}

首先连接 WebSocket,然后在 localhost:8080/push 上打开浏览器。应该发送消息。

请注意,这似乎是 Spring Boot 2.1.7 的一个错误,我在我的博客文章中提到了它。


推荐阅读