首页 > 解决方案 > Spring Boot WebSocketStompClient 未收到消息

问题描述

目前,我可以看到在测试用例中我能够成功连接并向 WS 服务器/端点发送消息。但是,我没有收到任何消息。测试用例中的completeableFuture对象等待消息 10 秒然后超时。我也尝试调试源代码,其中我可以session看到destination正确subscribers加载

我的 WebSocket 配置:

@EnableWebSocketMessageBroker
class WebSocketConfig : WebSocketMessageBrokerConfigurer {

    override fun registerStompEndpoints(registry: StompEndpointRegistry) {
        registry.addEndpoint("/ws").setAllowedOrigins("*").withSockJS()
    }

    override fun configureMessageBroker(registry: MessageBrokerRegistry) {
        registry.enableSimpleBroker( "/topic/")
        registry.setApplicationDestinationPrefixes("/api/")
        //registry.setUserDestinationPrefix("/user")
    }

控制器:

class ChatController(private val chatService: ChatService) {

    @MessageMapping("/user/chat/{channelId}")
    @SendTo("/topic/chat/{channelId}")
    fun chatMessage(@DestinationVariable("channelId") channelId: UUID, chatMessageDTO: ChatMessageDTO): ChatNotification {
        return chatService.submitMessage(chatMessageDTO, channelId)
    }

服务:

 fun establishChatSession(chatChannelDTO: ChatChannelDTO): ChatChannelDTO {
        if (chatChannelDTO.userOne == chatChannelDTO.userTwo) {
            throw InvalidInputDataException("")
        }
        val optionalChatChannel = getExistingChannel(chatChannelDTO)
        return if (optionalChatChannel.isPresent) {
            ChatChannelDTO.fromChatChannel(optionalChatChannel.get())
        } else {
            newChatSession(chatChannelDTO)
        }
    }

测试 :

class ChatControllerIT(@Autowired private val chatService: ChatService, @Autowired private val  simpleMessagingTemplate: SimpMessagingTemplate) {

    @Value("\${local.server.port}")
    var port = 0;

    var completableFuture: CompletableFuture<ChatNotification> = CompletableFuture()
    lateinit var webSocketStompClient: WebSocketStompClient

    @BeforeEach
    fun setup() {
        this.webSocketStompClient = WebSocketStompClient(SockJsClient(listOf(WebSocketTransport(StandardWebSocketClient()))))
        webSocketStompClient.messageConverter = MappingJackson2MessageConverter()
    }

    @Test
    fun verifyGreetingIsReceived() {
        val channel = chatService.establishChatSession(ChatChannelDTO(userOne = UUID.randomUUID(), userTwo = UUID.randomUUID()))
        val stompSession = webSocketStompClient.connect("ws://localhost:$port/ws", object : StompSessionHandlerAdapter() {}).get(10, TimeUnit.SECONDS)
        println("subscribing to::::::::::   /topic/chat/${channel.channelId}")

        val message = ChatMessageDTO(message = "Hello", senderId = channel.userOne, senderName = "Pranav", recipientName = "Monika", recipientId = channel.userTwo)
        stompSession.send("/api/user/chat/${channel.channelId}", message)

        stompSession.subscribe("/topic/chat/${channel.channelId}", object: StompFrameHandler{

            override fun getPayloadType(headers: StompHeaders): Type {
                return ChatNotification::class.java
            }

            override fun handleFrame(headers: StompHeaders, @Nullable payload: Any?) {
                completableFuture.complete(payload as ChatNotification)
            }
        })

        val response = completableFuture.get(10, TimeUnit.SECONDS)
        println(response)
    }
}

关于这里出了什么问题的任何想法?

标签: javaspring-bootkotlinspring-websocket

解决方案


这段代码对我来说是正确的。我创建了示例项目并且它有效。您确定 local.server.port 具有正确的值。试试这个测试课。如果它不起作用,请直接与我联系,我将向您发送示例项目

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
class ChatControllerIT(@Autowired private val chatService: ChatService) {

    @LocalServerPort
    var port = 0

    var completableFuture: CompletableFuture<ChatNotification> = CompletableFuture()
    lateinit var webSocketStompClient: WebSocketStompClient

    @BeforeEach
    fun setup() {
        this.webSocketStompClient = WebSocketStompClient(SockJsClient(listOf(WebSocketTransport(StandardWebSocketClient()))))
        webSocketStompClient.messageConverter = MappingJackson2MessageConverter()
    }

    @Test
    fun verifyGreetingIsReceived() {
        val channel = chatService.establishChatSession(ChatChannelDTO(userOne = UUID.randomUUID(), userTwo = UUID.randomUUID()))
        val stompSession = webSocketStompClient.connect("ws://localhost:$port/ws", object : StompSessionHandlerAdapter() {}).get(10, TimeUnit.SECONDS)
        println("subscribing to::::::::::   /topic/chat/${channel.channelId}")

        val message = ChatMessageDTO(message = "Hello", senderId = channel.userOne, senderName = "Pranav", recipientName = "Monika", recipientId = channel.userTwo)
        stompSession.send("/api/user/chat/${channel.channelId}", message)

        stompSession.subscribe("/topic/chat/${channel.channelId}", object: StompFrameHandler {

            override fun getPayloadType(headers: StompHeaders): Type {
                return ChatNotification::class.java
            }

            override fun handleFrame(headers: StompHeaders, @Nullable payload: Any?) {
                completableFuture.complete(payload as ChatNotification)
            }
        })

        val response = completableFuture.get(10, TimeUnit.SECONDS)
        println(response)
    }
}

推荐阅读