首页 > 解决方案 > 仅在处理完所有消息后才向 Akka actor 询问结果

问题描述

我试图将一大块文本分成多个段落,并通过调用外部 API 来同时处理它。每次响应来自该段落的 API 时,都会更新一个不可变列表。

处理完段落并更新列表后,我想向 Actor 询问最终状态,以便在接下来的步骤中使用。

以下方法的问题是我永远不知道何时处理所有段落。一旦处理完所有段落并且列表是最终的,我需要取回targetStore 。

def main(args: Array[String]) {
    val source = Source.fromFile("input.txt")
    val extDelegator = new ExtractionDelegator()
    source.getLines().foreach(line => extDelegator.processParagraph(line))
    extDelegator.getFinalResult()

  }


case class Extract(uuid: UUID, text: String)

case class UpdateList(text: String)

case class DelegateLambda(text: String)


case class FinalResult()


class ExtractionDelegator {
 
  val system = ActorSystem("ExtractionDelegator")
  val extActor = system.actorOf(Props(classOf[ExtractorDelegateActor]).withDispatcher("fixed-thread-pool"))
  implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")

  def processParagraph(text: String) = {
    extActor ! Extract(uuid, text)

  }

  def getFinalResult(): java.util.List[String] = {
    implicit val timeout = Timeout(5 seconds)
    val askActor = system.actorOf(Props(classOf[ExtractorDelegateActor]))
    val future = askActor ? FinalResult()
    val result = Await.result(future, timeout.duration).asInstanceOf[java.util.List[String]]
    result
  }

  def shutdown(): Unit = {
    system.terminate()
  }

}


/* Extractor Delegator actor*/
class ExtractorDelegateActor extends Actor with ActorLogging {
  var targetStore:scala.collection.immutable.List[String] = scala.collection.immutable.List.empty

  def receive = {
    case Extract(uuid, text) => {
      context.actorOf(Props[ExtractProcessor].withDispatcher("fixed-thread-pool")) ! DelegateLambda(text)

    }
    case UpdateList(res) => {
      targetStore = targetStore :+ res
    }
    case FinalResult() => {
      val senderActor=sender()
      senderActor ! targetStore

    }
  }
}

/* Aggregator actor*/
class ExtractProcessor extends Actor with ActorLogging {
  def receive = {
    case DelegateLambda(text) => {
      val res =callLamdaService(text)
      sender ! UpdateList(res)
    }

  }

  def callLamdaService(text: String): String = { 
    //THis is where external API is called.  
    Thread.sleep(1000)
    result
  }
}

标签: scalaakkaactorakka-httpakka-actor

解决方案


不知道你为什么要在这里使用演员,最简单的就是

// because you call external service, you have back async response most probably
def callLamdaService(text: String): Future[String]

并处理您所做的文本

implicit val ec = scala.concurrent.ExecutionContext.Implicits.global // use you execution context here
Future.sequence(source.getLines().map(callLamdaService)).map {results =>
  // do what you want with results
}

如果您仍想使用演员,您可以将其替换callLamdaServiceprocessParagraph内部将对ask工人演员执行的操作,返回结果的人(因此,签名processParagraph将是def processParagraph(text: String): Future[String]

如果您仍然想开始多个任务然后询问结果,那么您只需要使用context.becomewith receive(worker: Int),当您增加每条消息的工作人员Extract数量并减少每条消息的工作人员数量时UpdateListFinalResult对于非零数量的处理工人,您还需要实施延迟处理。


推荐阅读