scala - Scala调用多个期货 - 可用时使用队列
问题描述
我有一个用例,我将调用一个 API,它将返回我需要进行第二次 API 调用的 ID 列表。我在这里尝试使用 Scala Futures,但似乎卡住了。
我有一个按比例缩小的示例,我将向 inpQueue 添加第一个 API 的休止符,然后迭代 inpQueue 以进行第二个 APi 调用。一旦 inpQueue 中有数据,我就需要开始进行第二次 API 调用。我尝试使用 Stream.continually(inpQueue),但它似乎不起作用。
object DependentFutures {
val datasets: immutable.Seq[(Int, String)] =
Seq((1,"2X6barD"), (2,"3d9vCgW"), (3,"2M02Xz0"), (4,"2XOu2uL"), (5,"2AfBWF0")).toList
var inpQueue = new ArrayBlockingQueue[Future[ (Int, String)]](1000)
def addToQueue(params: (Int, String))(implicit ec: ExecutionContext):Future[ (Int, String)] = Future {
val d = Future {(params._1 + 1, params._2)}
inpQueue.put(d)
println(new java.sql.Timestamp(System.currentTimeMillis())+" -> "+params._1 )
(params._1+1, params._2)
}
def main(args: Array[String]): Unit = {
val executor: ExecutorService = Executors.newFixedThreadPool(1)
implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)
val q1 = Future.traverse(datasets)(addToQueue).andThen {
case result =>
println("* processing is over, shutting down the executor")
executionContext.shutdown()
}
val x = Future.sequence(in)
Await.result(inpQueue, Duration(10, "seconds")))(implicit ec: ExecutionContext)
}
}
解决方案
如果你想处理队列,你应该在某个循环的某个地方从它那里轮询。如果Future
队列中有 s - 您可以通过 foreach 函数进行处理。
def process(params: (Int, String)): Unit = ???
def main(args: Array[String]): Unit = {
{
implicit val executionContext: ExecutionContextExecutorService =
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
Future.traverse(datasets)(addToQueue).andThen {
case result =>
println("* processing is over, shutting down the executor")
executionContext.shutdown()
}
}
implicit val ec: ExecutionContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
while (true) {
val currentResultFuture = inpQueue.poll(Long.MaxValue, SECONDS)
currentResultFuture.foreach(process)
}
}
推荐阅读
- c++ - 为什么编译 Qt BluetoothDiscoveryAgent 会导致链接器错误
- html - 如何在 chrome web view 输入文本中获取基本 html 以像其他文本元素或坏 chrome 用户代理一样缩放?
- javascript - 如果选择了另一个选项,如何取消选择来自 Array 的选项
- machine-learning - 梯度提升树如何计算分类错误?
- assembly - 以 64 位代码从 [esp] 加载时出现段错误
- python - 在列表中更改列表中值的类型
- mongodb - 如何使用 ReactiveMongo 正确更新插入
- c# - MongoDB 中的异步更改流
- laravel - 如何在 Laravel Markdown Mailable 中生成一个换行符
- javascript - 在javascript中动态创建一个表