akka-stream - 重用 Source[ByteString, Any] 的任何方式(不将其全部保存在内存中)
问题描述
有什么方法可以使 Source 可重用?
我有一个 akka-http 服务器,它接收大文件上传,然后通过 HTTP POST 将(分块)数据流式传输到订阅者 websocket 和其他 HTTP 服务器。在这两种情况下,都有一个接受 Source[ByteString, Any] 的 API:
- HttpEntity(..., source) 在 HTTP POST 的情况下
- Websocket 的 BinaryMessage(source)
与采用单个 ByteString 的版本相比,使用这些 API 具有一些优势(只需执行单个 HTTP 发布,可以重新创建相同的分块消息等)。
那么有没有办法让这样的工作(不缓冲内存中的所有内容)?
val allSinks: Seq[Sink[Source[ByteString, Any], Future[Done]]] = ???
val g = RunnableGraph.fromGraph(GraphDSL.create(allSinks) { implicit builder => sinks =>
import GraphDSL.Implicits._
// Broadcast with an output for each subscriber
val broadcast = builder.add(Broadcast[DataSource](sinks.size))
Source.single(source) ~> broadcast
sinks.foreach(broadcast ~> _)
ClosedShape
})
解决方案
资源不可重复使用
不幸的是,aSource
在用完后无法重用。数据的底层“源”可以重新用于创建单独的Source
值,但每个值最多可以在一个流上运行。
持久性
如果重放功能是必需的,则流式传输的数据将需要存储在持久性机制中,以便以后重放。这种机制可以是文件系统、数据库、Kafka、......
下面是使用文件系统的模型。
传入的POST
消息正文可以以写入模式流式传输到文件:
post {
path(Segment) { fileName =>
extractRequestEntity { entity =>
complete {
entity
.dataBytes
.toMat(FileIO.toPath(Paths.get(fileName), Set(CREATE_NEW, WRITE)))(Keep.Right)
.run()
.andThen {
case Success(ioResult) =>
StatusCodes.Ok -> s"wrote ${ioResult.count} bytes"
case Failure(ex) =>
StatusCodes.InternalServerError -> ex.toString
}
}
}
}
}
}
那么就不需要创建一个Broadcast
集线器,你只需GET
用文件的内容来响应请求:
path(Segment) { fileName =>
getFromFile(fileName)
}
这利用了这样一个事实,即大多数操作系统将允许您将文件作为字节流写入,同时从文件中读取作为字节流......
推荐阅读
- python - 在 XLS 或 CSV 文件中写入具有不同颜色的单元格的一部分
- javascript - 使用两个嵌套的 for 结构来记录以下模式。有任何想法吗?
- javascript - 如何模拟手机框架在我的网站上播放视频?
- google-cloud-platform - 如何为在谷歌云平台中创建的实例激活 https
- c - 如何在 Gtk 中组合加速键?
- unit-testing - 如何在 Corda 4 中使用“MockServices”进行单元测试?
- apache-poi - Apache POI XSSFSimpleShape 矩形和三角形
- node.js - verifyPasswordResetCode 不是函数 firebase Admin SDK
- javascript - 添加动态html行后如何选择文本输入
- flutter - 在颤动中将嵌套列表保存到数据库中