amazon-web-services - 通过 Akka HTTP 或 Play 以 zip 格式从 S3 流式下载多个文件
问题描述
我有一个 S3 结构,它是 Spark 作业的结果,该作业写入如下所示的分区 CSV 文件。
bucketA
output
cleaned-data1
part000....csv
part001....csv
part002....csv
cleaned-data2
.....
我需要的是能够拥有一个指向输出文件名的 Akka HTTP 端点,以将所有部分下载为 zip 文件:https://..../download/cleaned-data1
.
调用此端点时,理想情况下我想:
打开从服务器到客户端浏览器的 zip 流
打开零件文件并将字节直接流式传输到 zip 流到客户端,而不在服务器上进行任何缓冲以避免内存问题
所有部分的总大小可以达到 30GB 未压缩。
有没有办法通过 Akka Stream、Akka HTTP 或 Play 做到这一点?我可以使用 Alpakka 库吗?
根据拉蒙的回答临时编辑:
def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
bucketNameToKeySource(bucket)
.map(key => S3.download(bucket, key))
.map(x => x.map(y => y.fold(Source.empty[ByteString])(_._1)))
.flatMapConcat(identity)
.flatMapConcat(identity)
解决方案
type Key = String
def bucketNameToKeySource(bucket : String) : Source[Key, _] =
S3.listBucket(bucket, None)
.map(_.key)
这现在可以与S3 下载功能相结合,并且flatMapConcat
:
def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
bucketNameToKeySource(bucket)
.map(key => S3.download(bucket, key))
.map(_.getOrElse(Source.empty[ByteString])
.flatMapConcat(identity)
此功能现在可以合并到您的Route
. 该问题要求“打开从服务器到客户端的 zip 流”,因此encodeResponse
使用:
def bucketNameToRoute(parentBucketName : String) : Route =
encodeResponse {
path ("download" / Segment) { childBucketName =>
val bucketName = parentBucketName + "/" + childBucketName
val byteStrSource = bucketNameToFileContents(bucketName)
complete(OK -> byteStrSource)
}
}
推荐阅读
- sql - perl - 缓慢解析并发送到数据库
- javascript - Dialog 组件中的动态内容
- javascript - 如何阻止从 iframe 发出的请求
- python - 如何在pygame中让球从三角形反弹?
- c# - 网络核心 IOptions
利用 - amazon-ec2 - 无法访问安装在 Amazon Ec2 RHEL 上的端口 8081 上的 nexus3 OSS
- jestjs - 在 Jest 中访问捕获的 stderr 输出
- mysql - SqlDataAdapter#Fill:`SelectCommand.connection` 属性尚未初始化
- python - 如何在 SQLAlchemy 中使用 MySQL SOUNDEX 函数
- api - 对 API 更改感到困惑