scala - 遍历状态列表直到所有批处理
问题描述
我正在循环所有批次 ID 并在循环中轮询“待处理”以外的状态。
因此,如果批处理状态为成功/失败,则它存在并从 ListBuffer 中删除元素,以便它不会再次处理它。
- 我怎样才能让它更优雅
- 该列表可能包含 None 。那么如何只处理具有值的元素。
数据类型 : Option[ (String,String,Long)] //批次ID、状态和唯一ID 示例:List (Some("batch1","Pending",123) , None , Some("batch2","success" ,124).)
val lstBuffer=StatusList.to[ListBuffer]
var lstBuffer1 = lstBuffer.map(k => (k.get._1, pollStatus(k.get._1) ,k._3))
do {
for ( i<- lstBuffer1){
if (i._2 != "success"){
lst1 -= i
println(s"${i._1} succeeded for ${i._3}")
}
if (i._2 != "failed"){
lst1 -= i
println(s"${i._1} failed for ${i._3}")
}
}
lst1 = lstBuffer1.map(k => (k._1, pollStatus(k._1) ,k._3))
}while(lst1.isEmpty)
解决方案
使它成为Future
:
def awaitCompletion(
batchId: String,
id: String
): Future[Boolean] = Future(pollStatus(batchId)).flatMap {
case "success" =>
print(s"$batchId succeeded for $id")
Future.successful(True)
case "failed" =>
print(s"$batchId failed for $id")
Future.successful(False)
case _ => awaitCompletion(batchId, id)
}
现在,假设batches
是一个元组列表(batchid, id)
(我不知道为什么你需要一个Option
,但如果你出于某种原因确实有它,只需batches
用batches.flatten
下面的替换),你可以执行以下操作:
Await.ready(
Future.traverse(batches)(awaitCompletion.tupled), 1.hour
)
这里有一些可以改进的地方(例如,像这样不断地轮询可能不是一个好主意,你应该有某种退避策略......也许,使用带有延迟的计时器而不是pollStatus
直接调用) ,但这应该给你基本的想法,让你开始。
针对评论中的问题进行更新,这是一个基于 java 的无阻塞延迟执行的简单实现Timer
(如果您使用像 akka 或 play 之类的 scala 框架,则最好使用它们的“本机”实现,但是在“vanilla” scala 程序中,可以这样做):
object Delayed {
import java.util.{Timer, TimerTask}
private val timer = new Timer
def apply[T](delay: Duration)(task: => T): Future[T] = {
val promise = Promise[T]()
val tt = new TimerTask {
override def run(): Unit = promise.success(task)
}
timer.schedule(tt, delay.toMillis)
promise.future
}
}
有了这个,您可以简单地将上面的替换为Future(pollStatus(batchId))
在每次轮询之前添加 100 毫秒的延迟。awaitCompletion
Delayed(100.millis)(pollStatus(batchId))
推荐阅读
- material-ui - material-ui 创建许多重复的类
- curl - 带有用户名/密码身份验证的 curl 失败
- c# - 异步任务中的 ASP.NET EF Core 连接问题
- vbscript - 列出文件/子文件的名称、大小和创建日期?
- c# - clickonce 应用程序在调用服务时出现网关超时,但使用资源管理器可以访问该服务
- javascript - AddEventListener 在错误的时间工作
- docker - 容器无法连接到 docker-compose 中的 microsoft sql server
- c++ - ia 不留眼睛 Player ue4 C++
- api - 如何读取 RDD 元素(字典格式的列表)并将其转换为 Pyspark API 可读的元组?
- microsoft-graph-api - Power Automate HTTP 错误 - {\"ErrorCode\":\"Forbidden\",\"Message\"