multithreading - 如何为 Akka HTTP 配置特定的调度程序?
问题描述
我有一个简单的 Akka HTTP 应用程序,它使用了 websocket。我的请求处理程序有阻塞调用(如 JDBC)。所以,我需要使用一些固定大小的线程池来处理这样的代码。
所以,据我了解,我应该使用application.conf(像这样 -https://github.com/mkuthan/example-akka-http/blob/master/src/main/resources/application.conf)。但我不知道如何使用固定的持续线程配置自定义线程池。
当我运行我的应用程序并执行线程转储时,我看到两个线程名称:
- akka-system-akka.actor.default-dispatcher-62
- Routes-akka.actor.default-dispatcher-79
我不明白,这些线程池是什么。
我尝试设置默认线程池,如下所示:
akka {
actor {
default-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 40
}
}
}
}
它的工作方式非常奇怪:
- 每个线程池都有自己的 40 个线程,所以我有 80 个线程。据我了解,每个启动的调度程序都有自己的 40 个线程。这是坏的。
- 它不是 FixedThreadPool -https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)。应用程序启动时。没有任何线程。然后,当应用程序处理了一些请求时,就会产生线程。当一段时间没有传入请求时,线程已经死亡。
解决方案
akka-system-akka
您可能使用两个名称和启动了两个单独的 ActorSystems Routes-akka
,因此您在日志中看到两种类型的线程名称。
thread-pool-executor
是java运行时定义的ThreadPoolExecutor,它不是FixedThreadPool。
ThreadPoolExecutor 将根据 corePoolSize(请参阅 getCorePoolSize())和 maximumPoolSize(请参阅 getMaximumPoolSize())设置的边界自动调整池大小(请参阅 getPoolSize())。当在方法 execute(Runnable) 中提交了一个新任务,并且运行的线程少于 corePoolSize 时,即使其他工作线程处于空闲状态,也会创建一个新线程来处理该请求。如果运行的线程数多于 corePoolSize 但少于 maximumPoolSize,则仅当队列已满时才会创建新线程。通过将 corePoolSize 和 maximumPoolSize 设置为相同,您可以创建一个固定大小的线程池。通过将 maximumPoolSize 设置为基本上无界的值,例如 Integer.MAX_VALUE,您允许池容纳任意数量的并发任务。最典型的是,
因此,您可以看到它以小于定义的大小开始,并根据传入的任务增加线程数。
阅读部分Keep-alive times
解释线程是如何终止的。
来到手头的问题:
不要覆盖default-dispatcher
,因为它被 akka 用于其他目的,即将执行的消息传递给所有参与者,这很容易被错误配置。而且,最重要的是,不要在默认调度程序上运行阻塞任务(阅读akka 文档了解更多详细信息)。而是引入一个单独的调度程序并在那里执行您的阻塞代码。
这是一个如何使用普通 http 请求的示例(对不起,我之前没有使用过 web sockets)
val asyncHandler: HttpRequest => Future[HttpResponse] = { req =>
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
Future {
// blocking call
HttpRespone(???)
}(blockingExecutionContext) // this can be passed implicitly too
}
Http().bindAndHandleAsync(asyncHandler, "localhost")
blocking-dispatcher
必须以application.conf
与您所做的类似的方式进行配置,default-dispatcher
但它是在配置文件的根目录中定义的。
blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
}
通常,将您的阻塞执行隔离到单独的执行上下文中,您可以专门为这种类型的阻塞操作进行配置。由于执行任务的类型受到您的限制和控制,因此微调这种方法更容易。
推荐阅读
- python - Schonhage-Strassen 乘法实现错误
- python - Python - 如何将分类器的结果加入 DataFrame 以将其可视化为散点图?
- spring - Spring SFTP集成:我们可以基于单个触发器文件将多个文件从远程复制到本地吗
- ruby - 从 0 开始注入是否与总和相同
- apify - Apify.utils.puppeteer 对象中缺少方法
- python-3.x - TF2.0:翻译模型:恢复保存的模型时出错:检查点(根)中未解析的对象.optimizer.iter:属性
- vb.net - 以编程方式通过 STDIN 向 ffmpeg 发送字节以创建静止图像文件
- swiftui - SwiftUI 模态状态未使用 ObservableObject 重置
- javascript - 从我的 php/html 代码中执行提示命令
- python - 从 4326 到 3857 的 GeoPandas 重投影无法正常工作