scala - akka-如何确保将动态数量的参与者的所有响应返回给父参与者?
问题描述
每次我的程序启动时,我都需要创建可变数量的参与者,然后必须确保在一段时间后返回所有响应。这个 链接 为固定数量的演员提供了一个好主意,但动态数量呢?
这是我创建演员并将消息传递给他们的代码:
ruleList = ...
val childActorList: Iterable[ActorRef] = ruleList.map(ruleItem =>
context.actorOf(DbActor.props(ruleItem.parameter1, ruleItem.parameter2)))
implicit val timeout = Timeout(10.second)
childActorList.foreach(childActor =>
childActor ? (tempTableName, lastDate)
)
更新-1
根据@Raman Mishra guides,我更新了我的代码如下,这是父演员中的代码:
override val supervisorStrategy: SupervisorStrategy = {
OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
case exp: SQLException => //Resume;
throw exp
case exp:AskTimeoutException => throw exp
case other: Exception => throw other
}
}
override def receive: Receive = {
case Start(tempTableName, lastDate) => {
implicit val timeout = Timeout(10.second)
ruleList.foreach { ruleItem =>
val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
ask(childActor, (tempTableName, lastDate)).mapTo[Seq[Int]]
onComplete {
lastDate)).mapTo[Seq[Int]] onComplete {
case util.Success(res) => println("done" + res + ruleItem._2)
case util.Failure(exp: AskTimeoutException) => println("Failed query:" + ruleItem._2); throw exp
case other => println(other)
}
}
在儿童演员中:
case (brokerTableName, lastDate) => {
Logger("Started query by actor" + self.path.name + ':' +
val repo = new Db()
val res = repo.getAggResult(query = (brokerTableName, lastDate))
val resWrapper = res match {
case elem: Future[Any] => elem
case elem:Any => Future(elem)
}
resWrapper pipeTo self
}
case res:List[Map[Any, Any]] => {
// here final result is send to parent actor
repo.insertAggresults(res, aggTableName) pipeTo context.parent
}
现在,每当我运行主应用程序时,首先,父演员启动并创建子演员并使用 ask 方法向他们发送消息。子角色执行他们的任务,但这里的问题是子角色响应永远不会返回给父角色,并且在每次运行应用程序AskTimeoutException
时都会发生。我怀疑onComplete
方法的使用是否正确。任何帮助将不胜感激。
“更新-2”
我发现问题在于使用context.parent
而不是 sender()。另外,当我将结果的第一部分发送给发件人,并且发件人要求第二部分时,问题已解决,但我不知道这里发生了什么,为什么我不能通过管道发送给自己并将最终结果返回给父母?
这是最后的代码:
在父演员中:
override def receive: Receive = {
case Start(tempTableName, lastDate) => {
println("started: called by remote actor")
implicit val timeout = Timeout(5 second)
ruleList.foreach { ruleItem =>
val childActor = context.actorOf(DbActor.props(ruleItem._1, query = ruleItem._2))
ask(childActor, Broker(tempTableName, lastDate)) onComplete {
// (childActor ? Broker(tempTableName, lastDate)).mapTo[Seq[Int]] onComplete {
case util.Success(res: List[Map[Any, Any]]) => (childActor ? res) onComplete {
case util.Success(res: Seq[Any]) => println("Successfull- Num,ber of documents:" + res.length + " " + ruleItem._2)
case util.Failure(exp: AskTimeoutException) => println("Failed for writing - query:" + ruleItem._2); throw exp
}
case util.Failure(exp: AskTimeoutException) => println("Failed for reading - query :" + ruleItem._2); throw exp
case other => println(other)
}
}
}
}
在儿童演员中:
case (brokerTableName, lastDate) => {
Logger("Started query by actor" + self.path.name + ':' +
val repo = new Db()
val res = repo.getAggResult(query = (brokerTableName, lastDate))
val resWrapper = res match {
case elem: Future[Any] => elem
case elem:Any => Future(elem)
}
resWrapper pipeTo sender()
}
case res:List[Map[Any, Any]] => {
// here final result is send to parent actor
repo.insertAggresults(res, aggTableName) pipeTo sender()
}
解决方案
sender()
回复context.parent
不起作用的原因是,询问创建了一个临时参与者来处理响应。您需要回复这个临时演员:发件人(与父母不同)。
也不清楚该getAggResult
方法是否阻塞。如果是这样,这将无济于事(请参见此处)。
推荐阅读
- c# - 检查 Azure SQL 表中的电子邮件以发送 mvc C#
- coq - 是否可以在不使用反转的情况下在 Coq 中证明 `forall n: nat, le n 0 -> n = 0.`?
- sql - 在 2 个表 SQL 之间的列中查找具有不同值的所有行
- sql - Redshift/SQL 中前 20 个百分位数的计算
- eclipse - 如何将 zip 文件添加到 Eclipse 中的 Junit 测试?
- apache-kafka - 命令式与函数式 kafka 流
- python - 是否有比较两个 DataFrame 并输出不同元素的功能?
- php - 颤振图像上传在真实设备上不起作用它在模拟器上工作
- javascript - 如何使用 JavaScript 在 Microsoft Edge(Ctrl + Shift + T)中禁用“重新打开最后一个选项卡”?
- json - 为什么 Spatie laravel-webhook-client 有效负载仅在生产服务器上为空?