首页 > 解决方案 > 在 Scala 中使用 Recovery 编写期货

问题描述

我有一个方法,可以执行一些异步操作。每个异步操作都需要恢复。如何以更可组合的方式编写方法?

伪scala代码:

object Test {

  import ExecutionContext.Implicits.global

  case class ObjectData(
                         internal: String,
                         external1: String = "",
                         external2: String = ""
                       )

  case class ObjectInfo(
                         id: String,
                         internal: String,
                         external1: String = "",
                         external2: String = ""
                       )

  def addObject(data: ObjectData): Future[ObjectInfo] = {
    internalActionWithRollback(data.internal) { objectInfo =>
      externalActionWithRollback(objectInfo.id, data.external1) {
        externalActionWithRollback(objectInfo.id, data.external2) {
          Future.successful(
            objectInfo.copy(
              external1 = data.external1,
              external2 = data.external2
            )
          )
        }
      }
    }
  }

  private def internalActionWithRollback[R](internal: String)
                                           (nextAction: ObjectInfo => Future[R]): Future[R] = ???

  private def externalActionWithRollback[R](id: String, external: String)
                                           (nextAction: => Future[R]): Future[R] = ???

}

编辑:

@IvanKurchenko 帮助我解决了这个问题。flatMap我们需要具有和功能的未来包装器map

object TransactFuture {
  implicit class FutureOps[T, R](underling: Future[T]) {
    def rollbackWith(rollback: PartialFunction[Throwable, Future[R]]): TransactFuture[T, R] = {
      new TransactFuture[T, R](underling, rollback)
    }
    def empty: TransactFuture[T, R] = {
      new TransactFuture[T, R](underling, PartialFunction.empty[Throwable, Future[R]])
    }
  }

}

class TransactFuture[T, R](underlying: Future[T], rollback: PartialFunction[Throwable, Future[R]]) {

  private def recoveryInternal[S](implicit ec: ExecutionContext): PartialFunction[Throwable, Future[S]] = {
    case ex: Throwable =>
      val failed = Future.failed[S](ex)
      rollback.lift(ex).fold(failed)(_.flatMap[S](_ => failed))
  }

  def flatMap[S](f: T => Future[S])(implicit ec: ExecutionContext): Future[S] = {
    underlying.flatMap(f).recoverWith(recoveryInternal)
  }
  def map[S](f: T => S)(implicit ec: ExecutionContext): Future[S] = {
    underlying.map(f).recoverWith(recoveryInternal)
  }
}

使用这个包装器,我们可以编写 main 函数:

  def addObject(data: ObjectData): Future[ObjectInfo] = {
    for {
      objectInfo <- addObjectInternal(internal)
      _ <- addExternal(objectInfo.id, data.external1)
        .rollbackWith({
          case _: Throwable => deleteObjectInternal(objectInfo.id)
        })
      _ <- addExternal(objectInfo.id, data.external2)
        .rollbackWith({
          case _: Throwable => deleteExternal(objectInfo.id)
        })
    } yield {
      objectInfo.copy(
        external1 = data.external1,
        external2 = data.external2
      )
    }
  }

标签: scala

解决方案


如果我对您的理解正确,您想在子执行或下一次Future执行失败时调用一些操作。这种行为似乎类似于某种交易。好吧,Future开箱即用不提供这种行为,但可以实现一些包装器:

class TransactFuture[T](underlying: Future[T], rollback: PartialFunction[Throwable, Future[Unit]]) {
  def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
    underlying.flatMap(f).recoverWith {
      case exception: Throwable =>
        val failure = Future.failed[S](exception)
        rollback.lift(exception).fold(failure)(_.flatMap(_ => failure))
    }
  }
}

// Just provides syntax sugar over Future
implicit class FutureOps[T](underling: Future[T]) {
  def rollbackWith(rollback: PartialFunction[Throwable, Future[Unit]]): TransactFuture[T] = {
    new TransactFuture[T](underling, rollback)
  }
}

可以通过以下方式使用:

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._

val action = for {
  _ <- Future.successful(println("A executed")).rollbackWith {
    case _: Throwable => Future.successful(println("A recovered"))
  }
  _ <- Future.successful(println("B executed")).rollbackWith {
    case _: Throwable => Future.successful(println("B recovered"))
  }
  _ <- Future.failed(new Exception("C failed"))
} yield ()

Await.result(action, 1 second)

所以结果输出将是:

A executed
B executed
B recovered
A recovered
Exception in thread "main" ......
Caused by: java.lang.Exception: C failed

希望这可以帮助!


推荐阅读