首页 > 解决方案 > Akka Stream 源队列:如何限制在具有背压的队列中的插入(来自可遍历的接口)

问题描述

我有一个背压队列,一个读取队列的流和一个在后台为这个队列提供数据的进程。(队列的消耗在队列馈送结束之前开始)。

馈送是从读取数据库的可遍历接口完成的。我在这个界面上唯一的方法是 foreach。我正在像这样喂队列:

 def sourceFromTraversable[T](traversable: Traversable[T]) = {
 val (queue, source) = Source.queue[T](queueSize, OverflowStrategy.backpressure).preMaterialize()
    Future {
      traversable.foreach{
        element =>
          Await.result(queue.offer(element), Duration.Inf)
      }
    }.map(_ => queue.complete())
  source
}

我使用 Await.result 是因为如果我不这样做,背压就不会应用于可遍历对象,即它会过多地拉取数据库,因为它将并行运行很多 Futures 而不是等待。

有没有一种更简洁的方法(没有 Await.result)可以通过像可遍历的接口来实现这一点?

谢谢

标签: scalaakka-stream

解决方案


推荐阅读