kotlin - 在不阻塞当前线程的情况下,延迟函数如何在 Kotlin 中工作?
问题描述
这几天在学习协程,大部分概念都很清楚,但我不明白延迟函数的实现。
延迟功能如何在延迟时间后恢复协程?对于一个简单的程序,只有一个主线程,为了在延迟时间后恢复协程,我假设应该有另一个计时器线程来处理所有延迟的调用并稍后调用它们。这是真的吗?有人可以解释延迟功能的实现细节吗?
解决方案
TL; 博士;
使用 runBlocking 时,延迟在内部包装并在同一线程上运行,而当使用任何其他调度程序时,它会挂起并通过事件循环线程恢复继续来恢复。检查下面的长答案以了解内部情况。
长答案:
@Francesc 答案指向正确,但有些抽象,仍然没有解释延迟在内部的实际工作方式。
因此,正如他指出的延迟函数:
public suspend fun delay(timeMillis: Long) {
if (timeMillis <= 0) return // don't delay
return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> ->
cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
}
}
它的作用是“在挂起函数中获取当前的延续实例,并在 lambda 中运行块后挂起当前正在运行的协程”
所以这条线cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont)
将被执行,然后当前的协程被挂起,即释放它坚持的当前线程。
cont.context.delay
指着
internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay
这表示如果ContinuationInterceptor
是延迟的实现,则返回否则使用 DefaultDelay,它是internal actual val DefaultDelay: Delay = DefaultExecutor
一个 DefaultExecutor,它是internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {...}
EventLoop 的一个实现,并且有一个自己的线程可以运行。
注意:ContinuationInterceptor
是Delay
协程在 runBlocking 块中时的实现,以确保延迟在同一线程上运行,否则不会。检查此代码段以查看结果。
现在我找不到由 runBlocking 创建的延迟的实现,因为internal expect fun createEventLoop(): EventLoop
它是一个从外部实现的期望函数,而不是由源实现的。但是DefaultDelay
实现如下
public override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val timeNanos = delayToNanos(timeMillis)
if (timeNanos < MAX_DELAY_NS) {
val now = nanoTime()
DelayedResumeTask(now + timeNanos, continuation).also { task ->
continuation.disposeOnCancellation(task)
schedule(now, task)
}
}
}
这是如何scheduleResumeAfterDelay
实现的,它通过延迟传递的延续创建一个DelayedResumeTask
,然后调用最终调用传递对象的schedule(now, task)
哪个调用scheduleImpl(now, delayedTask)
delayedTask.scheduleTask(now, delayedQueue, this)
delayedQueue
@Synchronized
fun scheduleTask(now: Long, delayed: DelayedTaskQueue, eventLoop: EventLoopImplBase): Int {
if (_heap === kotlinx.coroutines.DISPOSED_TASK) return SCHEDULE_DISPOSED // don't add -- was already disposed
delayed.addLastIf(this) { firstTask ->
if (eventLoop.isCompleted) return SCHEDULE_COMPLETED // non-local return from scheduleTask
/**
* We are about to add new task and we have to make sure that [DelayedTaskQueue]
* invariant is maintained. The code in this lambda is additionally executed under
* the lock of [DelayedTaskQueue] and working with [DelayedTaskQueue.timeNow] here is thread-safe.
*/
if (firstTask == null) {
/**
* When adding the first delayed task we simply update queue's [DelayedTaskQueue.timeNow] to
* the current now time even if that means "going backwards in time". This makes the structure
* self-correcting in spite of wild jumps in `nanoTime()` measurements once all delayed tasks
* are removed from the delayed queue for execution.
*/
delayed.timeNow = now
} else {
/**
* Carefully update [DelayedTaskQueue.timeNow] so that it does not sweep past first's tasks time
* and only goes forward in time. We cannot let it go backwards in time or invariant can be
* violated for tasks that were already scheduled.
*/
val firstTime = firstTask.nanoTime
// compute min(now, firstTime) using a wrap-safe check
val minTime = if (firstTime - now >= 0) now else firstTime
// update timeNow only when going forward in time
if (minTime - delayed.timeNow > 0) delayed.timeNow = minTime
}
/**
* Here [DelayedTaskQueue.timeNow] was already modified and we have to double-check that newly added
* task does not violate [DelayedTaskQueue] invariant because of that. Note also that this scheduleTask
* function can be called to reschedule from one queue to another and this might be another reason
* where new task's time might now violate invariant.
* We correct invariant violation (if any) by simply changing this task's time to now.
*/
if (nanoTime - delayed.timeNow < 0) nanoTime = delayed.timeNow
true
}
return SCHEDULE_OK
}
它最终将任务设置为DelayedTaskQueue
当前时间。
// Inside DefaultExecutor
override fun run() {
ThreadLocalEventLoop.setEventLoop(this)
registerTimeLoopThread()
try {
var shutdownNanos = Long.MAX_VALUE
if (!DefaultExecutor.notifyStartup()) return
while (true) {
Thread.interrupted() // just reset interruption flag
var parkNanos = DefaultExecutor.processNextEvent() /* Notice here, it calls the processNextEvent */
if (parkNanos == Long.MAX_VALUE) {
// nothing to do, initialize shutdown timeout
if (shutdownNanos == Long.MAX_VALUE) {
val now = nanoTime()
if (shutdownNanos == Long.MAX_VALUE) shutdownNanos = now + DefaultExecutor.KEEP_ALIVE_NANOS
val tillShutdown = shutdownNanos - now
if (tillShutdown <= 0) return // shut thread down
parkNanos = parkNanos.coerceAtMost(tillShutdown)
} else
parkNanos = parkNanos.coerceAtMost(DefaultExecutor.KEEP_ALIVE_NANOS) // limit wait time anyway
}
if (parkNanos > 0) {
// check if shutdown was requested and bail out in this case
if (DefaultExecutor.isShutdownRequested) return
parkNanos(this, parkNanos)
}
}
} finally {
DefaultExecutor._thread = null // this thread is dead
DefaultExecutor.acknowledgeShutdownIfNeeded()
unregisterTimeLoopThread()
// recheck if queues are empty after _thread reference was set to null (!!!)
if (!DefaultExecutor.isEmpty) DefaultExecutor.thread // recreate thread if it is needed
}
}
// Called by run inside the run of DefaultExecutor
override fun processNextEvent(): Long {
// unconfined events take priority
if (processUnconfinedEvent()) return nextTime
// queue all delayed tasks that are due to be executed
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) {
val now = nanoTime()
while (true) {
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
// to make sure that 'isEmpty' and `nextTime` that check both of them
// do not transiently report that both delayed and queue are empty during move
delayed.removeFirstIf {
if (it.timeToExecute(now)) {
enqueueImpl(it)
} else
false
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
}
}
// then process one event from queue
dequeue()?.run()
return nextTime
}
然后 finally 的事件循环(运行函数)通过使任务出队并在延迟时间已到时通过调用 delay 来internal actual object DefaultExecutor : EventLoopImplBase(), Runnable {...}
恢复实际挂起的函数来处理任务。Continuation
推荐阅读
- javascript - 如何使用 javascript 获取附加的部分 url
- angular - Angular 8 & Laravel echo websockets with socket io:找不到 Socket.io 客户端
- c# - 不断调用 GTKSharp 应用程序旁边的方法
- google-apps-script - 在提交 Google 表单时找不到具有给定 ID 的项目
- sql - 选择在一列中具有共同值的行
- c# - 无法在 CIL 中调用 2 个函数
- documentum - 我们如何为 Documentum Webtop 中的每个存储库和文件柜配置不同的列首选项
- node.js - 分离网络服务器和处理服务器
- azure-devops - 获取 TF30063:您无权访问 TF 命令的 https://dev.azure.com/XXXX 错误
- wordpress - 为什么 wocommerce 电子邮件模板覆盖不起作用?