首页 > 解决方案 > 如何从期货序列中仅读取成功值

问题描述

我正在学习 akka/scala 并试图只阅读那些Future从 a 成功Seq[Future[Int]]但无法正常工作的 s。

  1. 我模拟了一个由 10 个组成的数组,Future[Int]其中一些失败取决于取值FailThreshold(10 个都失败,0 个都失败)。
  2. 然后我尝试将它们读入一个 ArrayBuffer (找不到返回不可变结构和值的方法)。
  3. 此外,没有成功/失败过滤器,因此必须onComplete在每个未来运行一个过滤器并更新缓冲区作为副作用。
  4. 即使FailThreshold=0和 Seq 将所有 Future 设置为 Success,数组缓冲区有时也是空的,并且不同的运行返回不同大小的数组。

我尝试了一些来自网络的其他建议,比如Future.sequence在列表中使用,但是如果任何未来的变量失败,这会引发异常。


import akka.actor._
import akka.pattern.ask
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._
import scala.util.{Timeout, Failure, Success}
import concurrent.ExecutionContext.Implicits.global

case object AskNameMessage
implicit val timeout = Timeout(5, SECONDS) 

val FailThreshold = 0

class HeyActor(num: Int) extends Actor {
    def receive = {
        case AskNameMessage => if (num<FailThreshold) {Thread.sleep(1000);sender ! num} else sender ! num
    }
}

class FLPActor extends Actor {
    def receive = {
        case t: IndexedSeq[Future[Int]] => {
            println(t)
            val b = scala.collection.mutable.ArrayBuffer.empty[Int]
            t.foldLeft( b ){ case (bf,ft) => 
                ft.onComplete { case Success(v) => bf += ft.value.get.get }
                bf
            }
            println(b)

        }
    }
}
val system = ActorSystem("AskTest")
val flm = (0 to 10).map( (n) => system.actorOf(Props(new HeyActor(n)), name="futureListMake"+(n)) )
val flp = system.actorOf(Props(new FLPActor), name="futureListProcessor")

// val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.failed( throw new IllegalArgumentException("DONE!") ))
val delay = akka.pattern.after(500 millis, using=system.scheduler)(Future.successful(0))
val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )
flp ! seqOfFtrs

在大多数receive情况下FLPActor

Vector(Future(Success(0)), Future(Success(1)), Future(Success(2)), Future(Success(3)), Future(Success(4)), Future(Success(5)), Future(Success(6)), Future(Success(7)), Future(Success(8)), Future(Success(9)), Future(Success(10)))

但数组缓冲区b有不同数量的值,有时是空的。

有人可以指出我这里的差距吗?

标签: scalaakkafuture

解决方案


与其直接发送 IndexedSeq[Future[Int]],不如转换为 Future[IndexedSeq[Int]],然后通过管道将其传递给下一个 actor。您不会将期货直接发送给演员。你必须管它。

HeyActor 可以保持不变。

val seqOfFtrs = (0 to 10).map( (n) => Future.firstCompletedOf( Seq(delay, flm(n) ? AskNameMessage) ).mapTo[Int] )

进行恢复,并使用 Future.sequence 将其变成一个 Future:

val oneFut = Future.sequence(seqOfFtrs.map(f=>f.map(Some(_)).recover{ case (ex: Throwable) => None})).map(_.flatten)

如果您不了解 Some、None 和 flatten 的业务,请确保您了解 Option 类型。从序列中删除值的一种方法是将序列中的值映射到 Option(Some 或 None),然后展平序列。None 值被删除并且 Some 值被解包。

将数据转换为单个 Future 后,将其通过管道传输到 FLPActor:

oneFut pipeTo flp

FLPActor 应该用以下接收函数重写:

def receive = {
  case printme: IndexedSeq[Int] => println(printme)
}

在 Akka 中,从 Future 或 Future 的 onComplete 修改 Actor 的主线程中的某些状态是一个很大的禁忌。在最坏的情况下,它会导致竞争条件。请记住,每个 Future 都在自己的线程上运行,因此在 Actor 中运行 Future 意味着您可以在不同的线程中同时完成工作。让 Future 直接修改您的 Actor 中的某些状态,而 Actor 也在处理某些状态是灾难的根源。在 Akka 中,您直接在主要参与者的执行主线程中处理对状态的所有更改。如果您在 Future 中完成了一些工作,并且需要从 Actor 的主线程访问该工作,您可以将其通过管道传递给该 Actor。pipeTo 模式对于访问 Future 的完成计算来说是功能性的、正确的和安全的。

要回答您关于 FLPActor 为什么没有正确打印出 IndexedSeq 的问题:您在完成 Futures 之前打印出 ArrayBuffer。onComplete 不是在这种情况下使用的正确习惯用法,您通常应该避免使用它,因为它不是好的功能样式。

不要忘记 pipeTo 语法的 import akka.pattern.pipe。


推荐阅读