首页 > 解决方案 > 通过 Akka HTTP 或 Play 以 zip 格式从 S3 流式下载多个文件

问题描述

我有一个 S3 结构,它是 Spark 作业的结果,该作业写入如下所示的分区 CSV 文件。

bucketA
  output
    cleaned-data1
      part000....csv
      part001....csv
      part002....csv
    cleaned-data2
      .....

我需要的是能够拥有一个指向输出文件名的 Akka HTTP 端点,以将所有部分下载为 zip 文件:https://..../download/cleaned-data1.

调用此端点时,理想情况下我想:

  1. 打开从服务器到客户端浏览器的 zip 流

  2. 打开零件文件并将字节直接流式传输到 zip 流到客户端,而不在服务器上进行任何缓冲以避免内存问题

所有部分的总大小可以达到 30GB 未压缩。

有没有办法通过 Akka Stream、Akka HTTP 或 Play 做到这一点?我可以使用 Alpakka 库吗?

根据拉蒙的回答临时编辑:

  def bucketNameToFileContents(bucket : String) : Source[ByteString, _] =
    bucketNameToKeySource(bucket)
      .map(key => S3.download(bucket, key))
      .map(x => x.map(y => y.fold(Source.empty[ByteString])(_._1)))
      .flatMapConcat(identity)
      .flatMapConcat(identity)

标签: amazon-web-servicesplayframeworkakka-streamakka-httpalpakka

解决方案


第一步是创建一个桶内容akka流:Source

type Key = String

def bucketNameToKeySource(bucket : String) : Source[Key, _] = 
  S3.listBucket(bucket, None)
    .map(_.key)

这现在可以与S3 下载功能相结合,并且flatMapConcat

def bucketNameToFileContents(bucket : String) : Source[ByteString, _] = 
  bucketNameToKeySource(bucket)
    .map(key => S3.download(bucket, key))
    .map(_.getOrElse(Source.empty[ByteString])
    .flatMapConcat(identity)

此功能现在可以合并到您的Route. 该问题要求“打开从服务器到客户端的 zip 流”,因此encodeResponse使用:

def bucketNameToRoute(parentBucketName : String) : Route = 
  encodeResponse {
    path ("download" / Segment) { childBucketName =>

      val bucketName = parentBucketName + "/" + childBucketName

      val byteStrSource = bucketNameToFileContents(bucketName)

      complete(OK -> byteStrSource)
    } 
  }

推荐阅读