首页 > 解决方案 > 如何在 Java 代码中使用 Kotlin 协程实现 NIO Socket(客户端)?

问题描述

我想使用 Kotlin(v1.3.0)协程和 java.nio.channels。SocketChannel (NIO) 替换connectAndroid 中的 Socket(阻塞 IO)。因为这样可以节省很多线程。

下面的代码无法运行,因为job.await()它是 Kotlin 中的挂起函数,它只能在 Ktolin 协程块中调用。像launch{..}async{..}

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}

但是,我尝试将runBlocking{..}这个函数用作 Java 中的普通函数。但job.await阻止了当前的 Java 线程,而不是暂停

那么,我应该如何使用 Kotlin(v1.3.0) 协程实现这个功能?

标签: androidkotlinokhttpkotlinx.coroutinessocketchannel

解决方案


正如 Marko 指出的那样,即使该阻塞操作在异步协程中,您的代码仍将最终阻塞线程。要使用 Java 和 Kotlin 真正获得所需的异步行为,您需要使用Socket Channel的异步版本

有了这个,您可以获得真正的异步套接字处理。使用该类和 Kotlin 的suspendCoroutine构建器方法,您可以将异步处理程序转换为可挂起的调用。

这是一个实现读取的示例:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}

您可以选择删除我在这里所做的包装,然后asyncRead在 AsynchronousSocketChannel 上公开一个方法,如下所示:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}

这完全取决于品味以及您的设计目标到底是什么。您应该能够为初始连接实现类似的方法,就像我在此处阅读时所做的那样。


推荐阅读