首页 > 解决方案 > Spring Stomp 慢速连接

问题描述

我遇到了两个 Spring 微服务之间的慢 Stomp 连接,客户端大约需要 5 分钟来发送 CONNECT Stomp 消息。两个微服务都运行在同一个节点上。

Spring Boot 版本:2.1.3.RELEASE

Websocket服务器配置如下:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.enableSimpleBroker("/topic");
        config.setApplicationDestinationPrefixes("/app");
    }

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/ws-subscribe").setAllowedOrigins("*").withSockJS();
    }
}

客户端是这样配置的:

@Component
public class WebsocketClient {

    private GlobalConfiguration globalConfiguration;
    private WebsocketListener websocketListener;

    private WebSocketStompClient stompClient;

    @Autowired
    private JwtTokenGenerator jwtTokenGenerator;

    private ApplicationContext appContext;

    private Logger LOGGER = LoggerFactory.getLogger(WebsocketClient.class);

    @Autowired
    public WebsocketClient(ApplicationContext context,GlobalConfiguration globalConfiguration) {
        Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
        List<Transport> transports = Collections.singletonList(webSocketTransport);

        SockJsClient sockJsClient = new SockJsClient(transports);
        sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());

        this.stompClient = new WebSocketStompClient(sockJsClient);

        this.globalConfiguration = globalConfiguration;
        this.websocketListener = new WebsocketListener(context);
    }

    public ListenableFuture<StompSession> connectToWebsocket(){
       String url = globalConfiguration.getOrchestratorWs();

        StompHeaders connectHeaders = new StompHeaders();

        WebSocketHttpHeaders webSocketHttpHeaders = new WebSocketHttpHeaders();
        webSocketHttpHeaders.add("Authorization",jwtTokenGenerator.token);

        LOGGER.info("webSocketHttpHeaders: " + webSocketHttpHeaders.toString());
        LOGGER.info("url: " + url);

        this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
        return stompClient.connect(url,webSocketHttpHeaders,connectHeaders, websocketListener);
    }
}

在客户端,我们有这个 Websocket 监听器:

public class WebsocketListener extends StompSessionHandlerAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketListener.class);


    private GlobalConfiguration globalConfiguration;
    private ApplicationContext appContext;


    public WebsocketListener(ApplicationContext appContext) {
        this.appContext = appContext;
        this.globalConfiguration = appContext.getBean(GlobalConfiguration.class);
    }

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        LOGGER.info("New session established : " + globalConfiguration.getId());
        session.subscribe("/topic/injections/"+globalConfiguration.getId(), this);
        LOGGER.info("Subscribed to /topic/injections/"+globalConfiguration.getId());
      //  session.send("/app/chat", getSampleMessage());
      //  logger.info("Message sent to websocket server");
    }

    @Override
    public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
        LOGGER.error("Got an exception", exception);
    }

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return Map.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        LOGGER.info("Injection Received : " + payload);
        InjectionHandler injectionHandler = new InjectionHandler(appContext,(HashMap) payload);
        injectionHandler.start();
    }
}

afterConnected 在 ws://10.2.0.43:7071/ws-subscribe/513/8306a7ac357847678795b049450fb6c5/websocket 之后大约 5 分钟被调用

查看 Stomp 库的源代码,我看到微服务卡住了 5 分钟的代码如下:

public ListenableFuture<StompSession> connect(URI url, @Nullable WebSocketHttpHeaders handshakeHeaders, @Nullable StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
    Assert.notNull(url, "'url' must not be null");
    ConnectionHandlingStompSession session = this.createSession(connectHeaders, sessionHandler);
    WebSocketStompClient.WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketStompClient.WebSocketTcpConnectionHandlerAdapter(session);
    this.getWebSocketClient().doHandshake(adapter, handshakeHeaders, url).addCallback(adapter);
    return session.getSessionFuture();
}

特别是 session.getSessionFuture() 调用。关于什么可能导致客户端发送的 CONNECT 消息延迟的任何想法?

标签: springwebsocketstomp

解决方案


推荐阅读