首页 > 解决方案 > 如果使用 Spring + SockJS + STOMP,websocket 消息会被缓存吗?如果没有,如何检测网络断开?

问题描述

我已经通过 websocket 消息在不同时间更新了页面的不同部分。如果与服务器的网络连接由于任何原因(几秒到几天)失败,我需要让页面恢复到正确的状态。我在后端使用 Spring websockets,在前端使用 SockJS 和 STOMP.js(内置 Angular)。

Q 1. 这是否有任何部分缓存正在发送的 websocket 消息(我只使用 websockets 一种方式,从服务器到客户端),然后检测网络故障并在连接恢复时发送存储的消息?(所以这种情况会自动将页面恢复到正确的状态)

Q 2. 否则,我需要以某种方式检测网络连接丢失 - 如何准确执行此操作?(然后我会从前端触发整页重新加载 - 这是我可以轻松做到的一点)

我的后端在 Groovy 中,使用 Spring Websockets,即:

import org.springframework.messaging.simp.SimpMessagingTemplate
SimpMessagingTemplate brokerMessagingTemplate
brokerMessagingTemplate.convertAndSend('/topic/updatepage', pageComponentMessage)

用这个进行配置:

@CompileStatic
@Configuration
@EnableWebSocketMessageBroker
class MySocketConfig extends AbstractWebSocketMessageBrokerConfigurer {

    @Override
    void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) {
        messageBrokerRegistry.enableSimpleBroker "/queue", "/topic"
        messageBrokerRegistry.setApplicationDestinationPrefixes "/app"
    }

    @Override
    void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
        stompEndpointRegistry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS()
    }

    @Bean
    GrailsSimpAnnotationMethodMessageHandler grailsSimpAnnotationMethodMessageHandler(
        SubscribableChannel clientInboundChannel,
        MessageChannel clientOutboundChannel,
        SimpMessageSendingOperations brokerMessagingTemplate
    ) {
        def handler = new GrailsSimpAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
        handler.destinationPrefixes = ["/app"]
        return handler
    }

    @Bean
    GrailsWebSocketAnnotationMethodMessageHandler grailsWebSocketAnnotationMethodMessageHandler(
        SubscribableChannel clientInboundChannel,
        MessageChannel clientOutboundChannel,
        SimpMessageSendingOperations brokerMessagingTemplate
    ) {
        def handler = new GrailsWebSocketAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
        handler.destinationPrefixes = ["/app"]
        return handler
    }

}

和前端Angular代码:

export class MyWSService {
  private sockjsclient = null; // SockJS socket that connects to the server (preferably using a WebSocket)
  private stompClient = null; // Stomp client that handles sending messages over the WebSocket

  subscribeToTopic(topic: string, subInstance: any, callbackfn): any { 

    // SockJS socket connection does not exist yet, set it up:
    if(!this.sockjsclient) {
      this.sockjsclient = new SockJS(myWebsocketUrl);
    }

    // If STOMP instance (to send messages over the socket) does not exist yet, set it up:
    if(!this.stompClient) {

      this.stompClient = Stomp.over(this.sockjsclient);

      this.stompClient.connect({}, () => {

        subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
      })
    }
    // STOMP instance already exists, so use that existing connection:
    else {
        subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
      }
  }

  unsubscribeFromTopic(subscription: any) {
    subscription.unsubscribe(); // Unsubscribe from topic
  }
}

谢谢

标签: angularwebsocketspring-websocketstompsockjs

解决方案


这个问题已经在理论上了。即使你有一个缓存,你也不知道你需要保留它们多长时间。

我在一个项目中为接收此功能所做的是将所有用户订阅到他们的专用用户队列,并以队列仅在一定时间后过期的方式配置 RabbitMQ。使用这种方法 - 由于用户队列只有一个订阅者 - 未传递的消息将简单地留在队列中,直到相应的用户获取它。在客户端,一旦遇到连接丢失(通过丢失的心跳),您就可以建立新的 websocket 连接和订阅。一旦您订阅,您就会收到您错过的消息。

为了在 RabbitMQ 中配置用户队列,我实现了一个对帧ChannelInterceptor做出反应的:SUBSCRIBE

public class SubscribeInterceptor implements ChannelInterceptor {

    private static final int EXPIRE_IN_MILLISECONDS = 30 * 1000;

    @Override
    public Message<?> preSend(@NonNull Message<?> message, MessageChannel channel) {
        StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
        if (accessor.getCommand() == StompCommand.SUBSCRIBE) {
            String username = accessor.getUser().getName();
            String queueName = Stream.of(accessor.getDestination().split("/"))
                    .reduce((f, s) -> s)
                    .orElseThrow(IllegalStateException::new);

            accessor.setNativeHeader("x-queue-name", String.format("%s_%s", username, queueName));
            accessor.setNativeHeader("x-expires", Integer.toString(EXPIRE_IN_MILLISECONDS));
            accessor.setNativeHeader("durable", "true");
            accessor.setNativeHeader("auto-delete", "false");
            return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
        }
        return message;

    }
}

推荐阅读