首页 > 解决方案 > 如何在akka中实现并发处理?

问题描述

我有一个方法可以多次调用 db。由于我没有实现任何并发处理,第二个数据库调用必须等到第一个数据库调用完成,第三个必须等到第二个完成等等。

所有 db 调用都是相互独立的。我想以这样一种方式做到这一点,即所有数据库调用同时运行。

我是 Akka 框架的新手。

有人可以帮助我提供小样本或参考资料会有所帮助。应用程序是在 Scala Lang 中开发的。

标签: scalaconcurrencyakka

解决方案


对于给定的示例需求,您可以通过三种主要方式实现并发。

期货

对于问题中询问的特定用例,我会在任何 akka 构造之前推荐 Futures。

假设我们将数据库调用作为函数:

type Data = ???

val dbcall1 : () => Data = ???

val dbcall2 : () => Data = ???

val dbcall3 : () => Data = ???

可以很容易地应用并发,然后可以使用 Futures 收集结果:

val f1 = Future { dbcall1() }
val f2 = Future { dbcall2() }
val f3 = Future { dbcall3() }

for {
  v1 <- f1
  v2 <- f2
  v3 <- f3
} {
  println(s"All data collected: ${v1}, ${v2}, ${v3}")
}

阿卡溪流

一个类似的堆栈答案,它演示了如何使用该akka-stream库进行并发数据库查询。

阿卡演员

也可以写一个Actor来做查询:

object MakeQuery

class DBActor(dbCall : () => Data) extends Actor {
  override def receive = {
    case _ : MakeQuery => sender ! dbCall()
  }
}

val dbcall1ActorRef = system.actorOf(Props(classOf[DBActor], dbcall1)) 

但是,在这个用例中,Actor 的用处不大,因为您仍然需要将所有数据收集在一起。

您可以使用与“期货”部分相同的技术:

val f1 : Future[Data] = (dbcall1ActorRef ? MakeQuery).mapTo[Data]

for {
  v1 <- f1
  ...

或者,您必须通过构造函数手动将 Actor 连接在一起,并处理等待另一个 Actor 的所有回调逻辑:

class WaitingDBActor(dbCall : () => Data, previousActor : ActorRef) {
  override def receive = {
    case _ : MakeQuery => previousActor forward MakeQuery

    case previousData : Data => sender ! (dbCall(), previousData)
  }
}

推荐阅读