首页 > 解决方案 > Kotlin 协程未来等待超时(无取消)

问题描述

鉴于我们有一个CompletableFuture f, 在 kotlin 可挂起的范围内,我们可以调用f.await()并且我们将挂起直到它完成。

我在实现一个带有签名的类似函数时遇到了麻烦,如果未来在该持续时间内完成(以先发生者为准),该签名f.await(t)必须暂停最大毫秒或更快返回。t

这是我尝试过的。

/**
 * Suspend current method until future is done or specified duration expires,
 * whichever happens first without cancelling the future.
 * Returns true if its done, false otherwise.
 */
suspend fun <T> ListenableFuture<T>.await(duration: Long): Boolean {
   val future = this
   try {
      withTimeout(duration) {
         withContext(NonCancellable) { // this does not help either
            future.await() // i do not expect the future itself to be cancelled
         }
      }
   } catch (t: TimeoutCancellationException) {
      // we expected this
   } catch (e: Throwable) {
      e.printStackTrace()
   }

   return future.isDone

}

fun main(args: Array<String>) = runBlocking<Unit> {
   val future = GlobalScope.future {
      try {
         repeat(5) {
            println("computing")
            delay(500)
         }
         println("complete")
      } finally {
         withContext(NonCancellable) {
            println("cancelling")
            delay(500)
            println("cancelled")
         }
      }
   }

   for (i in 0..10) {
      if (future.await(2000)) {
         println("checking : done")
      } else {
         println("checking : not done")
      }
   }
}

我还需要一个类似的功能来完成工作。但也许这个解决方案也会帮助我......

输出为

computing
computing
computing
computing
checking : done
checking : done
checking : done
checking : done
cancelling
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done
checking : done

标签: javakotlinkotlin-coroutinescoroutinecompletable-future

解决方案


我写了一些测试代码:

fun main(args: Array<String>) = runBlocking {
    val future = calculateAsync()
    val result = future.await(2000)
    println("result=$result")
}

suspend fun <T> CompletableFuture<T>.await(duration: Long): T? {
    val future = this
    var result: T? = null
    try {
        withTimeout(duration) {
            result = future.await()
        }
    } catch (t: TimeoutCancellationException) {
        println("timeout exception")
    } catch (e: Throwable) {
        e.printStackTrace()
    }

    return result
}

@Throws(InterruptedException::class)
fun calculateAsync(): CompletableFuture<String> {
    val completableFuture = CompletableFuture<String>()

    Executors.newCachedThreadPool().submit {
        Thread.sleep(3000)
        println("after sleep")
        completableFuture.complete("Completed")
    }

    return completableFuture
}

运行此代码后,我们将得到一个输出:

timeout exception
result=null
after sleep

我们看到我们的扩展函数await返回null,因为我们将超时设置为 2000 毫秒,但CompletableFuture在 3000 毫秒后完成。在这种情况下CompletableFuture被取消(它的isCancelled属性返回true),但我们在calculateAsync函数中运行的线程继续执行(我们在日志中看到它after sleep)。

如果我们future.await(4000)main函数中将超时持续时间设置为 4000 毫秒,我们将看到下一个输出:

after sleep
result=Completed

现在我们有了一些结果,因为CompletableFuture执行速度超过 4000 毫秒。


推荐阅读