scala - 仅在处理完所有消息后才向 Akka actor 询问结果
问题描述
我试图将一大块文本分成多个段落,并通过调用外部 API 来同时处理它。每次响应来自该段落的 API 时,都会更新一个不可变列表。
处理完段落并更新列表后,我想向 Actor 询问最终状态,以便在接下来的步骤中使用。
以下方法的问题是我永远不知道何时处理所有段落。一旦处理完所有段落并且列表是最终的,我需要取回targetStore 。
def main(args: Array[String]) {
val source = Source.fromFile("input.txt")
val extDelegator = new ExtractionDelegator()
source.getLines().foreach(line => extDelegator.processParagraph(line))
extDelegator.getFinalResult()
}
case class Extract(uuid: UUID, text: String)
case class UpdateList(text: String)
case class DelegateLambda(text: String)
case class FinalResult()
class ExtractionDelegator {
val system = ActorSystem("ExtractionDelegator")
val extActor = system.actorOf(Props(classOf[ExtractorDelegateActor]).withDispatcher("fixed-thread-pool"))
implicit val executionContext = system.dispatchers.lookup("fixed-thread-pool")
def processParagraph(text: String) = {
extActor ! Extract(uuid, text)
}
def getFinalResult(): java.util.List[String] = {
implicit val timeout = Timeout(5 seconds)
val askActor = system.actorOf(Props(classOf[ExtractorDelegateActor]))
val future = askActor ? FinalResult()
val result = Await.result(future, timeout.duration).asInstanceOf[java.util.List[String]]
result
}
def shutdown(): Unit = {
system.terminate()
}
}
/* Extractor Delegator actor*/
class ExtractorDelegateActor extends Actor with ActorLogging {
var targetStore:scala.collection.immutable.List[String] = scala.collection.immutable.List.empty
def receive = {
case Extract(uuid, text) => {
context.actorOf(Props[ExtractProcessor].withDispatcher("fixed-thread-pool")) ! DelegateLambda(text)
}
case UpdateList(res) => {
targetStore = targetStore :+ res
}
case FinalResult() => {
val senderActor=sender()
senderActor ! targetStore
}
}
}
/* Aggregator actor*/
class ExtractProcessor extends Actor with ActorLogging {
def receive = {
case DelegateLambda(text) => {
val res =callLamdaService(text)
sender ! UpdateList(res)
}
}
def callLamdaService(text: String): String = {
//THis is where external API is called.
Thread.sleep(1000)
result
}
}
解决方案
不知道你为什么要在这里使用演员,最简单的就是
// because you call external service, you have back async response most probably
def callLamdaService(text: String): Future[String]
并处理您所做的文本
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global // use you execution context here
Future.sequence(source.getLines().map(callLamdaService)).map {results =>
// do what you want with results
}
如果您仍想使用演员,您可以将其替换callLamdaService
为processParagraph
内部将对ask
工人演员执行的操作,返回结果的人(因此,签名processParagraph
将是def processParagraph(text: String): Future[String]
)
如果您仍然想开始多个任务然后询问结果,那么您只需要使用context.become
with receive(worker: Int)
,当您增加每条消息的工作人员Extract
数量并减少每条消息的工作人员数量时UpdateList
。FinalResult
对于非零数量的处理工人,您还需要实施延迟处理。
推荐阅读
- php - Laravel 发布数据
- javascript - 在 Formik 中提交嵌套表单
- javascript - 使用 ngx-translate 以 Angular 10 转换动态字符串
- mysql - 通过匹配值重新组合数据
- javascript - dc.js:区域选择自动选择它刷过的所有图表的整个图表范围
- android - MvxFragment 进入动画
- laravel-7 - 如何在 Laravel Framework 7.29.3 中解析 laravel ui
- web-testing - 当我没有输入类型 = 文件时如何使用 testRigor 上传文件?
- performance - 我如何决定应该在哪里找到我的 TimesTen 数据库文件?
- react-native - 使用 ListItem 重定向错误:未定义不是对象(评估“navigation.navigate”)