首页 > 解决方案 > Scala调用多个期货 - 可用时使用队列

问题描述

我有一个用例,我将调用一个 API,它将返回我需要进行第二次 API 调用的 ID 列表。我在这里尝试使用 Scala Futures,但似乎卡住了。

我有一个按比例缩小的示例,我将向 inpQueue 添加第一个 API 的休止符,然后迭代 inpQueue 以进行第二个 APi 调用。一旦 inpQueue 中有数据,我就需要开始进行第二次 API 调用。我尝试使用 Stream.continually(inpQueue),但它似乎不起作用。

object DependentFutures {

  val datasets: immutable.Seq[(Int, String)] = 
Seq((1,"2X6barD"), (2,"3d9vCgW"), (3,"2M02Xz0"), (4,"2XOu2uL"), (5,"2AfBWF0")).toList


  var inpQueue  =  new ArrayBlockingQueue[Future[ (Int, String)]](1000)

  def addToQueue(params: (Int, String))(implicit ec: ExecutionContext):Future[ (Int, String)] = Future {
    val d = Future {(params._1 + 1, params._2)}
    inpQueue.put(d)

    println(new java.sql.Timestamp(System.currentTimeMillis())+" -> "+params._1 )
    (params._1+1, params._2)
  }

  def main(args: Array[String]): Unit = {


    val executor: ExecutorService = Executors.newFixedThreadPool(1)
    implicit val executionContext: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(executor)



    val q1 = Future.traverse(datasets)(addToQueue).andThen {
      case result =>
        println("* processing is over, shutting down the executor")
        executionContext.shutdown()
    }

    val x = Future.sequence(in)
    Await.result(inpQueue, Duration(10, "seconds")))(implicit ec: ExecutionContext)
  }
}

标签: scalaasynchronousfunctional-programmingfuturescala-collections

解决方案


如果你想处理队列,你应该在某个循环的某个地方从它那里轮询。如果Future队列中有 s - 您可以通过 foreach 函数进行处理。

  def process(params: (Int, String)): Unit = ???
  
  def main(args: Array[String]): Unit = {
    {
      implicit val executionContext: ExecutionContextExecutorService = 
        ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
      Future.traverse(datasets)(addToQueue).andThen {
        case result =>
          println("* processing is over, shutting down the executor")
          executionContext.shutdown()
      }
    }
    
    implicit val ec: ExecutionContext  = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2))
    while (true) {
      val currentResultFuture = inpQueue.poll(Long.MaxValue, SECONDS)
      currentResultFuture.foreach(process)
    }
  }

推荐阅读