首页 > 解决方案 > RSocket Android + Spring Boot后端路由错误:没有目的地''的处理程序

问题描述

我得到ApplicationErrorException: No handler for destination ''试图使用 RSocket 从 android 代码连接到我的 Web 服务器(弹簧启动)。作为一种传输方式,我使用 websockets。

在服务器端我使用:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-rsocket</artifactId>
</dependency>

在客户端我同时使用了:

implementation 'io.rsocket:rsocket-core:1.1.1'
implementation 'io.rsocket:rsocket-transport-netty:1.1.1'

implementation 'io.rsocket.kotlin:rsocket-core:0.13.1'
implementation 'io.rsocket.kotlin:rsocket-transport-ktor:0.13.1'
implementation 'io.rsocket.kotlin:rsocket-transport-ktor-client:0.13.1'
implementation "io.ktor:ktor-client-cio:1.6.1"

Ktor 和 Netty 都给了我同样的错误。错误日志如下:

安卓:

ApplicationErrorException (0x201): No handler for destination ''
        at io.rsocket.exceptions.Exceptions.from(Exceptions.java:76)
        at io.rsocket.core.RSocketRequester.handleFrame(RSocketRequester.java:261)
        at io.rsocket.core.RSocketRequester.handleIncomingFrames(RSocketRequester.java:211)
        at io.rsocket.core.RSocketRequester.$r8$lambda$kDn7LIfo960b6cXO3SLu8QVkTAE(Unknown Source:0)
        at io.rsocket.core.RSocketRequester$$ExternalSyntheticLambda2.accept(Unknown Source:4)
        at reactor.core.publisher.LambdaSubscriber.onNext(LambdaSubscriber.java:160)
        at io.rsocket.core.ClientServerInputMultiplexer$InternalDuplexConnection.onNext(ClientServerInputMultiplexer.java:248)
        at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:129)
        at io.rsocket.core.ClientServerInputMultiplexer.onNext(ClientServerInputMultiplexer.java:48)
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
        at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:365)
        at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:401)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:707)
        at reactor.netty.http.client.WebsocketClientOperations.onInboundNext(WebsocketClientOperations.java:161)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:764)
        Suppressed: java.lang.Exception: #block terminated with an error
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
        at reactor.core.publisher.Mono.block(Mono.java:1703)
        at com.rsockettester.MainActivity$connect$1$1.invokeSuspend(MainActivity.kt:86)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:738)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)

这是后端使用的控制器代码:

@Controller
class MainController {

    @MessageMapping("hello")
    fun hello() = "Hello!"

    @MessageMapping("name")
    fun helloName(name: String) = "Hello, $name!"
}

我用来从 Android 连接的代码'io.rsocket:rsocket-transport-netty:1.1.1'如下:

    private fun connect(route: String, message: String): String? = runBlocking {
        withContext(Dispatchers.IO) {

            val ws: WebsocketClientTransport =
                WebsocketClientTransport.create(URI.create(hostUrl))
            val clientRSocket = RSocketConnector.connectWith(ws).block()
            
            return@withContext try {
                
                val compositeByteBuf = CompositeByteBuf(ByteBufAllocator.DEFAULT, false, 1);
                val routingMetadata = TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, listOf(route))
                CompositeMetadataCodec.encodeAndAddMetadata(compositeByteBuf, ByteBufAllocator.DEFAULT, 
                                        WellKnownMimeType.MESSAGE_RSOCKET_ROUTING, routingMetadata.content)
                val md = ByteBufUtil.getBytes(compositeByteBuf)
                val payload = DefaultPayload.create(message.toByteArray(), md)
                
                val s = clientRSocket?.requestResponse(payload)
                s?.block()?.dataUtf8
            } catch (e: Exception) {
                Log.e("net", "RSocket cannot connect: ", e)
                e.asString()
            } finally {
                clientRSocket?.dispose()
            }

        }
    }

用于连接Ktor的代码(如此处所述如下:

    private fun connect(route: String, message: String): String? = runBlocking {
        val client = HttpClient(CIO) { //create and configure ktor client
            install(WebSockets)
            install(RSocketSupport) {
                connector = RSocketConnector {
                    connectionConfig {
                        payloadMimeType = PayloadMimeType(
                            data = "application/json",
                            metadata = "application/json"
                        )
                    }

                    acceptor {
                        RSocketRequestHandler {
                            requestResponse { it } //echo request payload
                        }
                    }
                }
            }
            expectSuccess = false
        }

        var rSocket: RSocket? = try {
            client.rSocket(hostUrl)
        } catch (e: Exception) {
            Log.e("net", "RSocket cannot connect:", e)
            return@runBlocking "RSocket cannot connect: ${e.asString()}"
        }

        return@runBlocking try {
            val payload = Payload(ByteReadPacket(message.toByteArray()),
                CompositeMetadata(RoutingMetadata(route)).toPacket())
            val response = rSocket?.requestResponse(payload)
            Log.d("net", "reached response")
            response?.let { it.data.readUTF8Line() }
        } catch (e: Exception) {
            e.printStackTrace()
            "RSocket cannot connect: ${e.asString()}"
        }
    }

正如我上面提到的,这两种方法都会导致相同的结果:No handler for destination ''

值得一提的是,当我从另一个 Spring Boot 客户端使用相同的路由时,这个问题就不存在了。

有谁知道我做错了什么?如果有人指出我的错误,我会很高兴。提前致谢。

我在 github 上创建了示例项目来帮助重现此错误:rsocket-android-spring

重现步骤:

  1. 克隆或下载 github 项目rsocket-android-spring
  2. 运行 Spring Boot 服务器
  3. 编辑提供 PC 正确 IP 地址的 hostUrl 变量(!)
  4. 运行 Android 应用程序并单击“发送”按钮

如果您希望在 Android 上从 Netty 切换到 Ktor,您可以使用 MainActivity 代码中的注释方法,但不要忘记使用所需的依赖项build.gradle(存在于那里)。

标签: androidspring-bootroutesrsocketrsocket-java

解决方案


问题出在元数据设置中。

rsocket-kotlin上的示例之后,我将元数据类型设置为metadata = "application/json",但要使用我需要的路由metadata = "message/x.rsocket.composite-metadata.v0"

非常感谢@haal 的详细回答

现在从 Android 连接的代码如下:

    private fun getPayload(route: String, message: String): Payload {
        val metadata = ByteBufAllocator.DEFAULT.compositeBuffer()
        val routingMetadata =
            TaggingMetadataCodec.createRoutingMetadata(ByteBufAllocator.DEFAULT, listOf(route))
        CompositeMetadataCodec.encodeAndAddMetadata(
            metadata,
            ByteBufAllocator.DEFAULT,
            WellKnownMimeType.MESSAGE_RSOCKET_ROUTING,
            routingMetadata.content
        )
        val data = ByteBufAllocator.DEFAULT.buffer().writeBytes(message.toByteArray())

        return DefaultPayload.create(data, metadata)
    }

    private fun connect(route: String, message: String): String? = runBlocking {
        withContext(Dispatchers.IO) {

            val ws: WebsocketClientTransport =
                WebsocketClientTransport.create(URI.create(hostUrl))
            val clientRSocket = RSocketConnector.create()
                //metadata header needs to be specified
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.string)
                // value of spring.rsocket.server.port eg 7000
                .connect(ws)
                .block()
            return@withContext try {
                val s = clientRSocket?.requestResponse(getPayload(route, message))
                s?.block()?.dataUtf8
            } catch (e: Exception) {
                Log.e("net", "RSocket cannot connect: ", e)
                e.asString()
            } finally {
                clientRSocket?.dispose()
            }

        }
    }

我更新了存储库,并且从 Android 客户端到 Spring Boot WebFlux 服务器的路由按预期 atm 工作。

非常感谢 RSocket 开发人员提供了如此出色的工具!


推荐阅读