首页 > 解决方案 > Scala/Akka 拉模式 - 经理从工人那里获得更多工作

问题描述

我正在为我的爱好项目寻求帮助。我正在尝试找到一种更好的方式,即我的拉模式中的管理器将如何处理任务列表更新(需要完成的工作列表)

我已经实现了一个拉模式,其中有一个工作工厂、经理和工人。Manager 在 akka 调度程序的帮助下从 Job Factory 获得新的工作。每当有新工作时,Worker 就会收到通知并开始咀嚼。

在我的实现中可能有一个怪癖是 Worker 产生了必须完成的新任务。目前,我已经通过从 Worker 到自身的递归消息解决了这个问题。

这是一个非常简单的想法:

class Worker extends Actor {
  def receive = {
    case Work0 ⇒ self ! Work1
    case Work1 ⇒ // ...
  }
}

但是,我不喜欢这种方法。我希望在 Manager 端扩展已经存在的任务列表,并让工作人员在出现新任务时向 Manager 发送新工作。

class Worker extends Actor {
  def receive = {
    case Work0 ⇒ Manager ! Work1
    case Work1 ⇒ // ...
  }
}

这是一个如何在能够动态更改的 Manager 内创建工作缓冲区的解决方案。每当 Worker 向 Manager 发送工作时,它将被添加到缓冲区的顶部。最终 Worker 将循环遍历管理器在缓冲区内的所有任务。

该示例没有任何拉模式本身的管道,这个想法只是为了测试如何让缓冲区工作。

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}

class Manager extends Actor {
  var iterator: Iterator[Int] = Iterator.empty
  var buffer: Option[mutable.ArrayBuffer[Int]] = None
  var iteratorCounter: Int = 0

  def receive = {
    case MyBuffer(workBuffer: mutable.ArrayBuffer[Int]) ⇒
      buffer = Some(workBuffer)
      iterator = workBuffer.iterator
    case "iterate" ⇒
      if (iterator.hasNext) {
        iterator.next() // This will be sent to worker as a Task to process
        iteratorCounter += 1
      } else if (buffer.get.length > iteratorCounter) {
        iterator = buffer.get.iterator
        iterator = iterator.drop(iteratorCounter)
      } else {
        iteratorCounter = 0
      }
    case "add" ⇒ // "Worker adds" new stuff to Manager
      val random = scala.util.Random
      val newVal = random.nextInt(100)
      val append = (buf: Option[ArrayBuffer[Int]], element: Int) => Some(buf.get += element)
      buffer = append(buffer, newVal)

    case _ => println("huh?")
  }
}

case class MyBuffer(workBuffer: ArrayBuffer[Int])
val buffer = mutable.ArrayBuffer(1,2,3,4,5,6,7,8,9,10)

val system = ActorSystem("PullPattern")
val helloActor = system.actorOf(Props(new Manager))
helloActor ! MyBuffer(buffer)
var a = 0

for (a <- 1 to 5) {
  helloActor ! "iterate"  
}

a = 0
for (a <- 1 to 3) {
  helloActor ! "add"  
}
a = 0
for (a <- 1 to 10) {
  helloActor ! "iterate"  
}

Scalafiddle链接。

我的问题:

  1. 这是应该如何在 akka 演员中创建一个动态变化的工作缓冲区吗?也许有更好的方法来解决这个问题?
  2. 如果这不是最糟糕的想法,那么实施呢?拥有一个 iteratorCounter 对我来说有点奇怪。询问迭代器长度会移动其指针。有没有其他方法可以解决这个问题?
  3. 也许在工人能够产生新任务的工人拉动关系中,经理根本不应该有一个变化的缓冲区?目前,我无法看到此解决方案有任何问题,但我可能没有提出正确的问题。

标签: scaladesign-patternsakka

解决方案


  1. 而不是mutable.ArrayBufferand Iterator,我建议使用immutable.Queue来排队工作并将enqueue/dequeue工作推入/拉出队列。从您的示例代码看来,您正在以 FIFO 方式消耗工作,因此队列应该最适合您的需要。

  2. 我不建议使用Iterator(可变的)作为指针。特别是,除了在next/hasNext对迭代器调用方法后使用迭代器是不安全的(参见Scala 文档)。使用 FIFO 队列,将不需要迭代器。

  3. 通常有一个管理角色来维护一个内部队列/映射/等来跟踪工作角色的任务。只要适用,我会选择一个不可变集合而不是可变集合。如有必要,将不可变集合设为 a ,private var如果您想避免任何.hot-swapvar

回复:具有使用 Akka 演员的拉模型的工人系统,这是一篇不错的文章。Lightbend 还有一个基于拉模型的分布式工作系统示例应用程序。


推荐阅读