首页 > 解决方案 > Directive to complete with RandomAccessFile read

问题描述

I have a large data file and respond to GET requests with very small portions of that file as Array[Byte]

The directive is:

get {
  dataRepo.load(param).map(data =>
    complete(
      HttpResponse(
        entity = HttpEntity(myContentType, data),
        headers = List(gzipContentEncoding)
      )
    )
  ).getOrElse(complete(HttpResponse(status = StatusCodes.NoContent)))
}

Where dataRepo.load is a function along the lines of:

val pointers: Option[Long, Int] = calculateFilePointers(param)
pointers.map { case (index, length) =>
  val dataReader = new RandomAccessFile(dataFile, "r")
  dataReader.seek(index)
  val data = Array.ofDim[Byte](length)
  dataReader.readFully(data)
  data
}

Is there a more efficient way to pipe the RandomAccessFile read directly back in the response, rather than having to read it fully first?

标签: scalaakka-streamakka-http

解决方案


Instead of reading the data into a Array[Byte] you could create an Iterator[Array[Byte]] which reads chunks of the file at a time:

val dataReader = new RandomAccessFile(dataFile, 'r')

val chunkSize = 1024

Iterator
  .range(index, index + length, chunkSize)
  .map {  currentIndex =>
    val currentBytes = 
      Array.ofDim[Byte](Math.min(chunkSize, length - currentIndex))

    dataReader seek currentIndex
    dataReader readFully currentBytes

    currentBytes
  } 

This iterator can now feed an akka Source:

val source : Source[Array[Byte], _] = 
  Source fromIterator (() => dataRepo.load(param))

Which can then feed an HttpEntity:

val byteStrSource : Source[ByteString, _] = source.map(ByteString.apply)

val httpEntity = HttpEntity(myContentType, byteStrSource)

Now each client will only use 1024 Bytes of memory at-a-time instead of the full length of your file read. This will make your server much more efficient at handling multiple concurrent requests as well as allowing your dataRepo.load to return immediately with a lazy Source value instead of utilizing a Future.


推荐阅读