首页 > 解决方案 > 重用 Source[ByteString, Any] 的任何方式(不将其全部保存在内存中)

问题描述

有什么方法可以使 Source 可重用?

我有一个 akka-http 服务器,它接收大文件上传,然后通过 HTTP POST 将(分块)数据流式传输到订阅者 websocket 和其他 HTTP 服务器。在这两种情况下,都有一个接受 Source[ByteString, Any] 的 API:

与采用单个 ByteString 的版本相比,使用这些 API 具有一些优势(只需执行单个 HTTP 发布,可以重新创建相同的分块消息等)。

那么有没有办法让这样的工作(不缓冲内存中的所有内容)?

val allSinks: Seq[Sink[Source[ByteString, Any], Future[Done]]] = ???

val g = RunnableGraph.fromGraph(GraphDSL.create(allSinks) { implicit builder => sinks =>
  import GraphDSL.Implicits._

  // Broadcast with an output for each subscriber
  val broadcast = builder.add(Broadcast[DataSource](sinks.size))
  Source.single(source) ~> broadcast
  sinks.foreach(broadcast ~> _)
  ClosedShape
})

标签: akka-streamakka-http

解决方案


资源不可重复使用

不幸的是,aSource在用完后无法重用。数据的底层“源”可以重新用于创建单独的Source值,但每个值最多可以在一个流上运行。

持久性

如果重放功能是必需的,则流式传输的数据将需要存储在持久性机制中,以便以后重放。这种机制可以是文件系统、数据库、Kafka、......

下面是使用文件系统的模型。

传入的POST消息正文可以以写入模式流式传输到文件:

post {
  path(Segment) { fileName =>
    extractRequestEntity { entity =>
      complete {
        entity
          .dataBytes
          .toMat(FileIO.toPath(Paths.get(fileName), Set(CREATE_NEW, WRITE)))(Keep.Right)
          .run()
          .andThen {
            case Success(ioResult) =>
              StatusCodes.Ok -> s"wrote ${ioResult.count} bytes"
            case Failure(ex) =>
              StatusCodes.InternalServerError -> ex.toString
          } 
        }
      }
    }
  }
}

那么就不需要创建一个Broadcast集线器,你只需GET用文件的内容来响应请求:

path(Segment) { fileName =>
  getFromFile(fileName)
}

这利用了这样一个事实,即大多数操作系统将允许您将文件作为字节流写入,同时从文件中读取作为字节流......


推荐阅读