multithreading - Akka - 用于阻塞操作的固定线程池 - 阻塞调度程序
问题描述
我已经为我的应用程序将要执行的阻塞操作配置了一个调度程序,如下所示:
engine {
blocking-io-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 3
}
throughput = 1
}
}
但是,我可以在日志中看到每 5 秒有超过 3 个线程正在处理请求,(线程 6、16、5、7、15)
控制台输出:
system-engine.blocking-io-dispatcher-6
system-engine.blocking-io-dispatcher-16
system-engine.blocking-io-dispatcher-5
system-engine.blocking-io-dispatcher-7
system-engine.blocking- io-dispatcher-15
请在下面找到代码:
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
import java.util.Date
import akka.routing.RoundRobinRouter
import akka.routing.DefaultResizer
class SendPushNotificationActor extends Actor {
def receive = {
case SendPushNotificationMessage(message) => {
println(Thread.currentThread().getName)
// Simulate blocking operation which takes 5 seconds to send the push notification message
Thread.sleep(5000)
}
}
}
object SendPushNotificationTest {
def main(args: Array[String]): Unit = {
val system = ActorSystem("system")
val sendPushNotificationActor = system.actorOf(Props[SendPushNotificationActor].withDispatcher("engine.blocking-io-dispatcher")
.withRouter(RoundRobinRouter(nrOfInstances = 5)))
for (i <- 1.to(100)) {
sendPushNotificationActor ! SendPushNotificationMessage("Push Notification Message")
}
}
}
case class SendPushNotificationMessage(message: String)
任何帮助知道如何使游泳池固定将不胜感激。
解决方案
池中的线程可能由于某种原因而死亡,在这种情况下,可以立即产生新线程以替换死线程并使池保持固定大小。新线程被赋予了递增的 ID,这就是您在日志文件中看到的。线程池的大小肯定是固定的。
推荐阅读
- css - 具有重复的多个背景图像
- vue.js - 将 v-model 添加到嵌套 v-for 循环中的输入
- scala - 在 spark sql 中使用别名值从现有数据帧创建另一个数据帧
- android - 如何创建华为安卓模拟器
- mysql - 与 sql_mode=only_full_group_by 不兼容的 MySQL 查询
- javascript - 函数的第一次执行总是比第二次慢吗?
- javascript - 过滤json数组上的特定值并创建另一个数组
- django - 无法使用 Python 3.8 在 Windows 10 上安装“psycopg2”
- google-apps-script - 是否可以使用谷歌表格设置带有自动插入值的注释?
- mongodb - 通过从数据库中获取增量数据来触发 Airflow ETL 作业