首页 > 解决方案 > 遍历状态列表直到所有批处理

问题描述

我正在循环所有批次 ID 并在循环中轮询“待处理”以外的状态。

因此,如果批处理状态为成功/失败,则它存在并从 ListBuffer 中删除元素,以便它不会再次处理它。

  1. 我怎样才能让它更优雅
  2. 该列表可能包含 None 。那么如何只处理具有值的元素。

数据类型 : Option[ (String,String,Long)] //批次ID、状态和唯一ID 示例:List (Some("batch1","Pending",123) , None , Some("batch2","success" ,124).)

val lstBuffer=StatusList.to[ListBuffer]   

var lstBuffer1 = lstBuffer.map(k => (k.get._1, pollStatus(k.get._1) ,k._3)) 

    do {
      for ( i<- lstBuffer1){
        if (i._2 != "success"){
          lst1 -= i
          println(s"${i._1} succeeded for ${i._3}")
        }
        if (i._2 != "failed"){
         lst1 -= i
          println(s"${i._1} failed for ${i._3}")
        }
        
      }
      lst1 = lstBuffer1.map(k => (k._1, pollStatus(k._1) ,k._3))

    }while(lst1.isEmpty)

标签: scala

解决方案


使它成为Future

    def awaitCompletion(
       batchId: String, 
       id: String
    ): Future[Boolean] = Future(pollStatus(batchId)).flatMap { 
       case "success" => 
          print(s"$batchId succeeded for $id")
          Future.successful(True)
       case "failed" => 
          print(s"$batchId failed for $id")
          Future.successful(False)
       case _ => awaitCompletion(batchId, id)
    }

现在,假设batches是一个元组列表(batchid, id)(我不知道为什么你需要一个Option,但如果你出于某种原因确实有它,只需batchesbatches.flatten下面的替换),你可以执行以下操作:

    Await.ready(
       Future.traverse(batches)(awaitCompletion.tupled), 1.hour
    )

这里有一些可以改进的地方(例如,像这样不断地轮询可能不是一个好主意,你应该有某种退避策略......也许,使用带有延迟的计时器而不是pollStatus直接调用) ,但这应该给你基本的想法,让你开始。

针对评论中的问题进行更新,这是一个基于 java 的无阻塞延迟执行的简单实现Timer(如果您使用像 akka 或 play 之类的 scala 框架,则最好使用它们的“本机”实现,但是在“vanilla” scala 程序中,可以这样做):

object Delayed {
  import java.util.{Timer, TimerTask}
  private val timer = new Timer
  def apply[T](delay: Duration)(task: => T): Future[T] = {
    val promise = Promise[T]()
    val tt = new TimerTask {
      override def run(): Unit = promise.success(task)
    }
    timer.schedule(tt, delay.toMillis)
    promise.future
  }
}

有了这个,您可以简单地将上面的替换为Future(pollStatus(batchId))在每次轮询之前添加 100 毫秒的延迟。awaitCompletionDelayed(100.millis)(pollStatus(batchId))


推荐阅读