首页 > 解决方案 > 运行 Scala 线程

问题描述

我是 Scala 的绝对初学者,但有这个问题要解决。所以我有一个参数列表

itemList = List('abc', 'def', 'ghi','jkl','mno', 'pqr')

我有这 3 个参数查询

 val q1 = "env='dev1'&id='123'&listitem='xyz'"
 val q2 = "env='dev2'&id='1234'&listitem='xyz'"
 val q3 = "env='dev3'&id='12345'&listitem='xyz'"

 val report1 = getReport(q1)
 val report2 = getReport(q2)
 val report3 = getReport(q3)

所以我试图遍历列表,用 listitem 替换 q1、q2 和 q3 中的 listitem 参数,然后为列表中的每个项目运行 http 请求报告。

由于每个 getReport 请求都是异步的,我需要等待,因此我无法转到列表中的下一项,就像我要执行循环一样。

所以我想为列表中的每个项目启动 3 个线程,然后将 3 个报告合并为 1 个最终报告,或者我可以按顺序执行。

我将如何为列表中的每个项目使用 3 个线程?这是我的想法:

val reportToken = [ q1, q2,q3 ]
val listTasks = [ getReport(q1) , getReport(q2) , getReport(q3) ]

for (i <- 1 to 3) {
    val thread = new Thread {
        override def run {
            listTasks (reportToken(i))
        }
   val concat += listTask(i) 
    }
    thread.start
    Thread.sleep(50)  
}

标签: multithreadingscalahttp

解决方案


您可以将每个任务包装在 中Future,申请map/recover处理成功/失败的 Futures,并使用Future.sequence将 Futures 列表转换为Future of list. 这是一个简单的例子:

import scala.concurrent.{Future, Await}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration

def getReport(q: String) = q match {
  case "q2" => throw new Exception()
  case q => s"got $q"
}

val reportToken = Seq("q1", "q2", "q3")
val listTasks = reportToken.map( q => Future{ getReport(q) } )
// listTasks: Seq[scala.concurrent.Future[String]] = ...

val f = Future.sequence(
  listTasks.map(_.map(Some(_)).recover{case _ => None})
)
// f: scala.concurrent.Future[Seq[Option[String]]] = ...

Await.result(f, Duration.Inf)
// res1: Seq[Option[String]] = List(Some(got q1), None, Some(got q3))

有关 Futures 的更多详细信息,请参阅相关的Scala 文档


推荐阅读