首页 > 解决方案 > 从 coroutineScope 内部创建actor会阻塞线程,但是作为 CoroutineScope 的扩展函数创建的相同actor不会

问题描述

我正在尝试在 kotlin 中使用 actor builder 构造。我已经编写了下面的代码来发送和接收来自演员的消息。

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)

fun CoroutineScope.newGreeter(greet: String) = actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
        launch(Dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(Dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

此代码按预期工作。但下面的代码不是,因为它正在挂起程序。

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.actor
import kotlinx.coroutines.channels.consumeEach

class GreetingsMessage(val to: String, val greetings: CompletableDeferred<String>)

suspend fun newGreeter(greet: String) = coroutineScope {
    actor<GreetingsMessage> {
        channel.consumeEach {
            it.greetings.complete("$greet ${it.to}")
        }
    }
}

fun main() {
    runBlocking {
        val greeter = newGreeter("Hello")
        val greetingsMessage = GreetingsMessage("World", CompletableDeferred())
        launch(Dispatchers.Default) {
            greeter.send(greetingsMessage)
        }
        launch(Dispatchers.Default) {
            println(greetingsMessage.greetings.await())
            greeter.close()
        }
    }
}

通过将 newGreeter 函数设置为挂起函数并通过 coroutineScope 封闭该函数对代码进行了轻微修改,对 newGreeter 方法的调用将阻塞线程并使其挂起程序。我相信 newGreeter 作为 CoroutineScope 的扩展函数和包含在 coroutineScope 内的挂起函数应该完全一样。

我想知道这两种方法之间的区别以及为什么第二种方法会挂起程序。

我用生产函数尝试了同样的事情,在这里我还发现调用暂停函数以获取 ReceieveChannel 正在阻塞线程,其中用作扩展函数的相同生产构造按预期工作

这段代码是非阻塞的

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun CoroutineScope.produceIntegers(n: Int) = produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}

因为这阻塞了对produceIntegers方法的调用

package com.byteobject.prototype.kotlin

import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

suspend fun produceIntegers(n: Int) = coroutineScope {
    produce<Int> {
        for (i in 1..n)
            send(i)
        close()
    }
}

fun main() {
    runBlocking {
        val intChan = produceIntegers(10)
        launch {
            for (i in intChan)
                println(i)
        }
    }
}

标签: kotlinkotlin-coroutines

解决方案


问题是coroutineScope { } 创建了一个新的阻塞范围(结构化并发) - 等待所有启动的协程完成。

参见:https ://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/coroutine-scope.html

一旦给定块及其所有子协程完成,此函数就会返回。

另一方面,扩展函数只使用上下文(接收者)中的协程作用域。


推荐阅读