scala - Akka Stream 源队列:如何限制在具有背压的队列中的插入(来自可遍历的接口)
问题描述
我有一个背压队列,一个读取队列的流和一个在后台为这个队列提供数据的进程。(队列的消耗在队列馈送结束之前开始)。
馈送是从读取数据库的可遍历接口完成的。我在这个界面上唯一的方法是 foreach。我正在像这样喂队列:
def sourceFromTraversable[T](traversable: Traversable[T]) = {
val (queue, source) = Source.queue[T](queueSize, OverflowStrategy.backpressure).preMaterialize()
Future {
traversable.foreach{
element =>
Await.result(queue.offer(element), Duration.Inf)
}
}.map(_ => queue.complete())
source
}
我使用 Await.result 是因为如果我不这样做,背压就不会应用于可遍历对象,即它会过多地拉取数据库,因为它将并行运行很多 Futures 而不是等待。
有没有一种更简洁的方法(没有 Await.result)可以通过像可遍历的接口来实现这一点?
谢谢
解决方案
推荐阅读
- google-apps-script - 如何让谷歌电子表格每天(同一天)刷新自己?
- c# - 布尔值的按位比较是否被认为是错误的?
- java - 在 Java 中,如何有效地从字节数组的开头和结尾修剪 0
- python - 如何检查我的字典中只有一个值被填充?
- .net - 无法从传输连接读取数据:连接尝试失败,因为连接方
- python - 强制chrome web驱动加载翻译成英文的日文网站
- iis - SignalR 与 NetCore,IIS 连接失败并在部署时不时出现超时或 404
- github - 是否可以为 GitHub 操作创建全局工作流?
- excel - 时间值被复制粘贴为小数
- python - 任何数字的数字总和,直到总和为一位数