spring - Spring Stomp 慢速连接
问题描述
我遇到了两个 Spring 微服务之间的慢 Stomp 连接,客户端大约需要 5 分钟来发送 CONNECT Stomp 消息。两个微服务都运行在同一个节点上。
Spring Boot 版本:2.1.3.RELEASE
Websocket服务器配置如下:
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
config.setApplicationDestinationPrefixes("/app");
}
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws-subscribe").setAllowedOrigins("*").withSockJS();
}
}
客户端是这样配置的:
@Component
public class WebsocketClient {
private GlobalConfiguration globalConfiguration;
private WebsocketListener websocketListener;
private WebSocketStompClient stompClient;
@Autowired
private JwtTokenGenerator jwtTokenGenerator;
private ApplicationContext appContext;
private Logger LOGGER = LoggerFactory.getLogger(WebsocketClient.class);
@Autowired
public WebsocketClient(ApplicationContext context,GlobalConfiguration globalConfiguration) {
Transport webSocketTransport = new WebSocketTransport(new StandardWebSocketClient());
List<Transport> transports = Collections.singletonList(webSocketTransport);
SockJsClient sockJsClient = new SockJsClient(transports);
sockJsClient.setMessageCodec(new Jackson2SockJsMessageCodec());
this.stompClient = new WebSocketStompClient(sockJsClient);
this.globalConfiguration = globalConfiguration;
this.websocketListener = new WebsocketListener(context);
}
public ListenableFuture<StompSession> connectToWebsocket(){
String url = globalConfiguration.getOrchestratorWs();
StompHeaders connectHeaders = new StompHeaders();
WebSocketHttpHeaders webSocketHttpHeaders = new WebSocketHttpHeaders();
webSocketHttpHeaders.add("Authorization",jwtTokenGenerator.token);
LOGGER.info("webSocketHttpHeaders: " + webSocketHttpHeaders.toString());
LOGGER.info("url: " + url);
this.stompClient.setMessageConverter(new MappingJackson2MessageConverter());
return stompClient.connect(url,webSocketHttpHeaders,connectHeaders, websocketListener);
}
}
在客户端,我们有这个 Websocket 监听器:
public class WebsocketListener extends StompSessionHandlerAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(WebsocketListener.class);
private GlobalConfiguration globalConfiguration;
private ApplicationContext appContext;
public WebsocketListener(ApplicationContext appContext) {
this.appContext = appContext;
this.globalConfiguration = appContext.getBean(GlobalConfiguration.class);
}
@Override
public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
LOGGER.info("New session established : " + globalConfiguration.getId());
session.subscribe("/topic/injections/"+globalConfiguration.getId(), this);
LOGGER.info("Subscribed to /topic/injections/"+globalConfiguration.getId());
// session.send("/app/chat", getSampleMessage());
// logger.info("Message sent to websocket server");
}
@Override
public void handleException(StompSession session, StompCommand command, StompHeaders headers, byte[] payload, Throwable exception) {
LOGGER.error("Got an exception", exception);
}
@Override
public Type getPayloadType(StompHeaders headers) {
return Map.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
LOGGER.info("Injection Received : " + payload);
InjectionHandler injectionHandler = new InjectionHandler(appContext,(HashMap) payload);
injectionHandler.start();
}
}
afterConnected 在 ws://10.2.0.43:7071/ws-subscribe/513/8306a7ac357847678795b049450fb6c5/websocket 之后大约 5 分钟被调用
查看 Stomp 库的源代码,我看到微服务卡住了 5 分钟的代码如下:
public ListenableFuture<StompSession> connect(URI url, @Nullable WebSocketHttpHeaders handshakeHeaders, @Nullable StompHeaders connectHeaders, StompSessionHandler sessionHandler) {
Assert.notNull(url, "'url' must not be null");
ConnectionHandlingStompSession session = this.createSession(connectHeaders, sessionHandler);
WebSocketStompClient.WebSocketTcpConnectionHandlerAdapter adapter = new WebSocketStompClient.WebSocketTcpConnectionHandlerAdapter(session);
this.getWebSocketClient().doHandshake(adapter, handshakeHeaders, url).addCallback(adapter);
return session.getSessionFuture();
}
特别是 session.getSessionFuture() 调用。关于什么可能导致客户端发送的 CONNECT 消息延迟的任何想法?
解决方案
推荐阅读
- scikit-learn - 如何在线性 SVM 上有效地正交投影数据点?
- z3 - Z3PY 非常慢,有很多变量?
- python-3.x - Azure python 版本路径与 IDE python 版本不同
- algorithm - 使用记忆化以功能递归的方式实现迭代解决方案
- git - 显示分支上的所有 git-lfs 文件(工作树)
- android - 在显示编辑文本的设置错误后,可绘制的右图标未在 android 中更新
- ios - 在 Unity 中向 iOS 中的本机自定义音频库授予麦克风权限
- python - 明天使用调度库运行一次作业
- azure - 如何让附加数据库在 VS2019 中与 Azure 一起工作?
- reactjs - 使用 React Query 从 API 过滤获取的列表