scala - 如何从期货序列中仅读取成功值
问题描述
我正在学习 akka/scala 并试图只阅读那些Future
从 a 成功Seq[Future[Int]]
但无法正常工作的 s。
- 我模拟了一个由 10 个组成的数组,
Future[Int]
其中一些失败取决于取值FailThreshold
(10 个都失败,0 个都失败)。 - 然后我尝试将它们读入一个 ArrayBuffer (找不到返回不可变结构和值的方法)。
- 此外,没有成功/失败过滤器,因此必须
onComplete
在每个未来运行一个过滤器并更新缓冲区作为副作用。 - 即使
FailThreshold=0
和 Seq 将所有 Future 设置为 Success,数组缓冲区有时也是空的,并且不同的运行返回不同大小的数组。
我尝试了一些来自网络的其他建议,比如Future.sequence
在列表中使用,但是如果任何未来的变量失败,这会引发异常。
import akka.actor._
import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}
import concurrent.ExecutionContext.Implicits.global
case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS)
val FailThreshold = 0
class HeyActor(num: Int) extends Actor {
def receive = {
case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num
}
}
class FLPActor extends Actor {
def receive = {
case t: IndexedSeq[Future[Int]] => {
println(t)
val b = scala.collection.mutable.ArrayBuffer.empty[Int]
t.foldLeft( b ){ case (bf,ft) =>
ft.onComplete { case Success(v) => bf += ft.value.get.get }
bf
}
println(b)
}
}
}
val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")
// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs
在大多数receive
情况下FLPActor
Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))
但数组缓冲区b
有不同数量的值,有时是空的。
有人可以指出我这里的差距吗?
- 为什么即使所有 Future 都解析为 Success,数组缓冲区的大小也会不同,
- 当我们想要
ask
使用 TimeOut 的不同参与者并仅使用那些已成功返回以进行进一步处理的请求时,使用什么正确模式。
解决方案
与其直接发送 IndexedSeq[Future[Int]],不如转换为 Future[IndexedSeq[Int]],然后通过管道将其传递给下一个 actor。您不会将期货直接发送给演员。你必须管它。
HeyActor 可以保持不变。
后
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
进行恢复,并使用 Future.sequence 将其变成一个 Future:
val oneFut = Future.sequence(seqOfFtrs.map(f=>f.map(Some(_)).recover{ case (ex: Throwable) => None})).map(_.flatten)
如果您不了解 Some、None 和 flatten 的业务,请确保您了解 Option 类型。从序列中删除值的一种方法是将序列中的值映射到 Option(Some 或 None),然后展平序列。None 值被删除并且 Some 值被解包。
将数据转换为单个 Future 后,将其通过管道传输到 FLPActor:
oneFut pipeTo flp
FLPActor 应该用以下接收函数重写:
def receive = {
case printme: IndexedSeq[Int] => println(printme)
}
在 Akka 中,从 Future 或 Future 的 onComplete 修改 Actor 的主线程中的某些状态是一个很大的禁忌。在最坏的情况下,它会导致竞争条件。请记住,每个 Future 都在自己的线程上运行,因此在 Actor 中运行 Future 意味着您可以在不同的线程中同时完成工作。让 Future 直接修改您的 Actor 中的某些状态,而 Actor 也在处理某些状态是灾难的根源。在 Akka 中,您直接在主要参与者的执行主线程中处理对状态的所有更改。如果您在 Future 中完成了一些工作,并且需要从 Actor 的主线程访问该工作,您可以将其通过管道传递给该 Actor。pipeTo 模式对于访问 Future 的完成计算来说是功能性的、正确的和安全的。
要回答您关于 FLPActor 为什么没有正确打印出 IndexedSeq 的问题:您在完成 Futures 之前打印出 ArrayBuffer。onComplete 不是在这种情况下使用的正确习惯用法,您通常应该避免使用它,因为它不是好的功能样式。
不要忘记 pipeTo 语法的 import akka.pattern.pipe。
推荐阅读
- parsing - 在解析表达式的评估中将 [IO String] 转换为 IO String
- c++ - 将 int*** 传递给函数
- python - 为什么这种创建目录的尝试会失败?
- deep-learning - DCgAN 仅产生噪声
- ruby-on-rails - AWS Beanstalk 与 AMI1 上的 Ruby 2.6 - 从 Puma 4.3.7 更新到 Puma 5.1.1
- android - 如何从 rxjava 平面图中调用协程用例
- python - 为什么 torchvision.utils.make_grid() 返回所需网格的副本?
- c++ - 出现此错误时我该怎么办:“int”类型的参数与 C++ 中“int(*)[101]”类型的参数不兼容
- sql - 触发以更新没有记录的值
- python - 如何使用 SerializerMethodField 订购查询集?