scala - Scala/Akka 拉模式 - 经理从工人那里获得更多工作
问题描述
我正在为我的爱好项目寻求帮助。我正在尝试找到一种更好的方式,即我的拉模式中的管理器将如何处理任务列表更新(需要完成的工作列表)
我已经实现了一个拉模式,其中有一个工作工厂、经理和工人。Manager 在 akka 调度程序的帮助下从 Job Factory 获得新的工作。每当有新工作时,Worker 就会收到通知并开始咀嚼。
在我的实现中可能有一个怪癖是 Worker 产生了必须完成的新任务。目前,我已经通过从 Worker 到自身的递归消息解决了这个问题。
这是一个非常简单的想法:
class Worker extends Actor {
def receive = {
case Work0 ⇒ self ! Work1
case Work1 ⇒ // ...
}
}
但是,我不喜欢这种方法。我希望在 Manager 端扩展已经存在的任务列表,并让工作人员在出现新任务时向 Manager 发送新工作。
class Worker extends Actor {
def receive = {
case Work0 ⇒ Manager ! Work1
case Work1 ⇒ // ...
}
}
这是一个如何在能够动态更改的 Manager 内创建工作缓冲区的解决方案。每当 Worker 向 Manager 发送工作时,它将被添加到缓冲区的顶部。最终 Worker 将循环遍历管理器在缓冲区内的所有任务。
该示例没有任何拉模式本身的管道,这个想法只是为了测试如何让缓冲区工作。
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
class Manager extends Actor {
var iterator: Iterator[Int] = Iterator.empty
var buffer: Option[mutable.ArrayBuffer[Int]] = None
var iteratorCounter: Int = 0
def receive = {
case MyBuffer(workBuffer: mutable.ArrayBuffer[Int]) ⇒
buffer = Some(workBuffer)
iterator = workBuffer.iterator
case "iterate" ⇒
if (iterator.hasNext) {
iterator.next() // This will be sent to worker as a Task to process
iteratorCounter += 1
} else if (buffer.get.length > iteratorCounter) {
iterator = buffer.get.iterator
iterator = iterator.drop(iteratorCounter)
} else {
iteratorCounter = 0
}
case "add" ⇒ // "Worker adds" new stuff to Manager
val random = scala.util.Random
val newVal = random.nextInt(100)
val append = (buf: Option[ArrayBuffer[Int]], element: Int) => Some(buf.get += element)
buffer = append(buffer, newVal)
case _ => println("huh?")
}
}
case class MyBuffer(workBuffer: ArrayBuffer[Int])
val buffer = mutable.ArrayBuffer(1,2,3,4,5,6,7,8,9,10)
val system = ActorSystem("PullPattern")
val helloActor = system.actorOf(Props(new Manager))
helloActor ! MyBuffer(buffer)
var a = 0
for (a <- 1 to 5) {
helloActor ! "iterate"
}
a = 0
for (a <- 1 to 3) {
helloActor ! "add"
}
a = 0
for (a <- 1 to 10) {
helloActor ! "iterate"
}
我的问题:
- 这是应该如何在 akka 演员中创建一个动态变化的工作缓冲区吗?也许有更好的方法来解决这个问题?
- 如果这不是最糟糕的想法,那么实施呢?拥有一个 iteratorCounter 对我来说有点奇怪。询问迭代器长度会移动其指针。有没有其他方法可以解决这个问题?
- 也许在工人能够产生新任务的工人拉动关系中,经理根本不应该有一个变化的缓冲区?目前,我无法看到此解决方案有任何问题,但我可能没有提出正确的问题。
解决方案
而不是
mutable.ArrayBuffer
andIterator
,我建议使用immutable.Queue
来排队工作并将enqueue/dequeue
工作推入/拉出队列。从您的示例代码看来,您正在以 FIFO 方式消耗工作,因此队列应该最适合您的需要。我不建议使用
Iterator
(可变的)作为指针。特别是,除了在next/hasNext
对迭代器调用方法后使用迭代器是不安全的(参见Scala 文档)。使用 FIFO 队列,将不需要迭代器。通常有一个管理角色来维护一个内部队列/映射/等来跟踪工作角色的任务。只要适用,我会选择一个不可变集合而不是可变集合。如有必要,将不可变集合设为 a ,但
private var
如果您想避免任何.hot-swap
var
回复:具有使用 Akka 演员的拉模型的工人系统,这是一篇不错的文章。Lightbend 还有一个基于拉模型的分布式工作系统示例应用程序。
推荐阅读
- python - Scikit-Learn 包装器和 RandomizedSearchCV:RuntimeError
- javascript - 使用正则表达式验证电子邮件,其中域部分中的扩展数可能会有所不同
- android - 5分钟后自动注销应用程序在xamarin表单中处于终止状态或后台状态?
- reactjs - Suspense 数据获取中的 UseRef
- pandas - 如何将数据框文本列拆分为布尔列
- excel - 尝试使用 VBA 对一行中的值求和,但无法解决代码中的错误
- javascript - 如何修复 `data-rbd-draggable-context-id` 不匹配。服务器:“1”客户端:“0”,带有 react-beautiful-dnd 和 next.js
- django - 复选框 Django ModelForm 列表
- java - MySQL 不会显示数据库中的用户,即使它们存在
- javascript - 问题接收访问令牌