首页 > 解决方案 > Apache Http 异步客户端的内容 GZIP 解压缩

问题描述

在经典的HttpClientGZIP 解压缩中,由ContentCompressionExec. 这是如何完成的HttpAsyncClient?我找不到任何AsyncExecChainHandler实现此功能的东西。

标签: apache-httpcomponentsapache-httpasyncclientapache-httpclient-5.x

解决方案


我最终AsyncResponseConsumer在scala中实现了以下自己。

class SimpleDecompressingResponseConsumer(val entityConsumer: AsyncEntityConsumer[Array[Byte]])
  extends AbstractAsyncResponseConsumer[SimpleHttpResponse, Array[Byte]](entityConsumer) {
  override def informationResponse(response: HttpResponse, context: HttpContext): Unit = ()

  override protected def buildResult(response: HttpResponse, entity: Array[Byte], contentType: ContentType): SimpleHttpResponse = {
    val simpleResponse = SimpleHttpResponse.copy(response)
    if (entity != null) simpleResponse.setBody(entity, contentType)
    simpleResponse
  }
}
class SimpleAsyncDecompressingEntityConsumer extends AbstractBinDataConsumer with AsyncEntityConsumer[Array[Byte]] {
  @volatile
  private var resultCallback: FutureCallback[Array[Byte]] = _
  private var encoding: Array[Byte] => Array[Byte] = _
  private var content: Array[Byte] = _

  private val buffer = new ByteArrayBuffer(1024)

  override def streamStart(entityDetails: EntityDetails, resultCallback: FutureCallback[Array[Byte]]): Unit = {
    this.resultCallback = resultCallback
    this.encoding = entityDetails.getContentEncoding match {
      case "gzip" | "x-gzip" =>
        bytes => IOUtils.toByteArray(new GZIPInputStream(new ByteArrayInputStream(bytes)))
      case "deflate" =>
        bytes => IOUtils.toByteArray(new DeflateInputStream(new ByteArrayInputStream(bytes)))
      case _ =>
        identity
    }
  }

  override def failed(cause: Exception): Unit = {
    if (resultCallback != null) resultCallback.failed(cause)
    releaseResources()
  }

  override def getContent: Array[Byte] = content

  override def capacityIncrement(): Int = Int.MaxValue

  override def data(src: ByteBuffer, endOfStream: Boolean): Unit = {
    if (src == null) return
    if (src.hasArray) buffer.append(src.array(), src.arrayOffset() + src.position(), src.remaining())
    else while (src.hasRemaining) buffer.append(src.get)
  }

  override def completed(): Unit = {
    this.content = encoding(buffer.toByteArray)
    if (resultCallback != null) resultCallback.completed(content)
    releaseResources()
  }

  override def releaseResources(): Unit = buffer.clear()
}

推荐阅读