首页 > 解决方案 > Akka - 用于阻塞操作的固定线程池 - 阻塞调度程序

问题描述

我已经为我的应用程序将要执行的阻塞操作配置了一个调度程序,如下所示:

engine {
  blocking-io-dispatcher {
     type = Dispatcher
     executor = "thread-pool-executor"
     thread-pool-executor {
       fixed-pool-size = 3
     }
       throughput = 1
  }
}

但是,我可以在日志中看到每 5 秒有超过 3 个线程正在处理请求,(线程 6、16、5、7、15)

控制台输出:

system-engine.blocking-io-dispatcher-6
system-engine.blocking-io-dispatcher-16
system-engine.blocking-io-dispatcher-5
system-engine.blocking-io-dispatcher-7
system-engine.blocking- io-dispatcher-15

请在下面找到代码:

import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import java.util.Date
import akka.routing.RoundRobinRouter
import akka.routing.DefaultResizer

class SendPushNotificationActor extends Actor {
   def receive = {
      case SendPushNotificationMessage(message) => {
      println(Thread.currentThread().getName)
      // Simulate blocking operation which takes 5 seconds to send the push notification message
      Thread.sleep(5000)
    }
  }
}

object SendPushNotificationTest {

  def main(args: Array[String]): Unit = {
    val system = ActorSystem("system")

    val sendPushNotificationActor = system.actorOf(Props[SendPushNotificationActor].withDispatcher("engine.blocking-io-dispatcher")
  .withRouter(RoundRobinRouter(nrOfInstances = 5)))

    for (i <- 1.to(100)) {
      sendPushNotificationActor ! SendPushNotificationMessage("Push Notification Message")
    }
  }
}

case class SendPushNotificationMessage(message: String)

任何帮助知道如何使游泳池固定将不胜感激。

标签: multithreadingscalaakka

解决方案


池中的线程可能由于某种原因而死亡,在这种情况下,可以立即产生新线程以替换死线程并使池保持固定大小。新线程被赋予了递增的 ID,这就是您在日志文件中看到的。线程池的大小肯定是固定的。


推荐阅读