angular - 如果使用 Spring + SockJS + STOMP,websocket 消息会被缓存吗?如果没有,如何检测网络断开?
问题描述
我已经通过 websocket 消息在不同时间更新了页面的不同部分。如果与服务器的网络连接由于任何原因(几秒到几天)失败,我需要让页面恢复到正确的状态。我在后端使用 Spring websockets,在前端使用 SockJS 和 STOMP.js(内置 Angular)。
Q 1. 这是否有任何部分缓存正在发送的 websocket 消息(我只使用 websockets 一种方式,从服务器到客户端),然后检测网络故障并在连接恢复时发送存储的消息?(所以这种情况会自动将页面恢复到正确的状态)
Q 2. 否则,我需要以某种方式检测网络连接丢失 - 如何准确执行此操作?(然后我会从前端触发整页重新加载 - 这是我可以轻松做到的一点)
我的后端在 Groovy 中,使用 Spring Websockets,即:
import org.springframework.messaging.simp.SimpMessagingTemplate
SimpMessagingTemplate brokerMessagingTemplate
brokerMessagingTemplate.convertAndSend('/topic/updatepage', pageComponentMessage)
用这个进行配置:
@CompileStatic
@Configuration
@EnableWebSocketMessageBroker
class MySocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
@Override
void configureMessageBroker(MessageBrokerRegistry messageBrokerRegistry) {
messageBrokerRegistry.enableSimpleBroker "/queue", "/topic"
messageBrokerRegistry.setApplicationDestinationPrefixes "/app"
}
@Override
void registerStompEndpoints(StompEndpointRegistry stompEndpointRegistry) {
stompEndpointRegistry.addEndpoint("/stomp").setAllowedOrigins("*").withSockJS()
}
@Bean
GrailsSimpAnnotationMethodMessageHandler grailsSimpAnnotationMethodMessageHandler(
SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel,
SimpMessageSendingOperations brokerMessagingTemplate
) {
def handler = new GrailsSimpAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
handler.destinationPrefixes = ["/app"]
return handler
}
@Bean
GrailsWebSocketAnnotationMethodMessageHandler grailsWebSocketAnnotationMethodMessageHandler(
SubscribableChannel clientInboundChannel,
MessageChannel clientOutboundChannel,
SimpMessageSendingOperations brokerMessagingTemplate
) {
def handler = new GrailsWebSocketAnnotationMethodMessageHandler(clientInboundChannel, clientOutboundChannel, brokerMessagingTemplate)
handler.destinationPrefixes = ["/app"]
return handler
}
}
和前端Angular代码:
export class MyWSService {
private sockjsclient = null; // SockJS socket that connects to the server (preferably using a WebSocket)
private stompClient = null; // Stomp client that handles sending messages over the WebSocket
subscribeToTopic(topic: string, subInstance: any, callbackfn): any {
// SockJS socket connection does not exist yet, set it up:
if(!this.sockjsclient) {
this.sockjsclient = new SockJS(myWebsocketUrl);
}
// If STOMP instance (to send messages over the socket) does not exist yet, set it up:
if(!this.stompClient) {
this.stompClient = Stomp.over(this.sockjsclient);
this.stompClient.connect({}, () => {
subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
})
}
// STOMP instance already exists, so use that existing connection:
else {
subInstance.wsSubscription = this.stompClient.subscribe(topic, (message) => callbackfn(message));
}
}
unsubscribeFromTopic(subscription: any) {
subscription.unsubscribe(); // Unsubscribe from topic
}
}
谢谢
解决方案
这个问题已经在理论上了。即使你有一个缓存,你也不知道你需要保留它们多长时间。
我在一个项目中为接收此功能所做的是将所有用户订阅到他们的专用用户队列,并以队列仅在一定时间后过期的方式配置 RabbitMQ。使用这种方法 - 由于用户队列只有一个订阅者 - 未传递的消息将简单地留在队列中,直到相应的用户获取它。在客户端,一旦遇到连接丢失(通过丢失的心跳),您就可以建立新的 websocket 连接和订阅。一旦您订阅,您就会收到您错过的消息。
为了在 RabbitMQ 中配置用户队列,我实现了一个对帧ChannelInterceptor
做出反应的:SUBSCRIBE
public class SubscribeInterceptor implements ChannelInterceptor {
private static final int EXPIRE_IN_MILLISECONDS = 30 * 1000;
@Override
public Message<?> preSend(@NonNull Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
if (accessor.getCommand() == StompCommand.SUBSCRIBE) {
String username = accessor.getUser().getName();
String queueName = Stream.of(accessor.getDestination().split("/"))
.reduce((f, s) -> s)
.orElseThrow(IllegalStateException::new);
accessor.setNativeHeader("x-queue-name", String.format("%s_%s", username, queueName));
accessor.setNativeHeader("x-expires", Integer.toString(EXPIRE_IN_MILLISECONDS));
accessor.setNativeHeader("durable", "true");
accessor.setNativeHeader("auto-delete", "false");
return MessageBuilder.createMessage(message.getPayload(), accessor.getMessageHeaders());
}
return message;
}
}
推荐阅读
- visual-studio - 文件夹中的新文件会自动添加到 TFS (TFVC) 挂起的更改中吗?
- javascript - 在溢出div的水平滚动上,填充进度条
- cordova - 即使设置了 httpMethod,cordova-plugin-file-transfer 上传也显示为 GET 请求
- django - 访问调用 celery 任务时传递的额外数据
- python - Python beautifulsoup:删除元素中的特定元素
- qt - QML:如何在 RowAlignment 中均匀分布?
- javascript - CSS / Javascript动画高度为具有不同高度的项目展开和折叠
- java - 如何获取当前选定的项目名称?
- bash - 在 gmake $(shell) 函数中转义 #
- angular - 角度材料输入字段在 mat-list 或 mat-option 中不起作用