首页 > 解决方案 > 项目流,每个产生 n{ http request } > merge{response} > wrapInHeaderAndFooter{data} > http-request

问题描述

我正在尝试使用流媒体解决经典的 ETL 问题。我有一批段,每个段都包含有关与该段关联的记录的信息,例如记录数、要检索的 url 等,以发出 http 请求以收集数据。我需要从分页大小为 100 条记录的源中提取记录,合并每个段的记录页面,包装在 xml 页眉和页脚中。现在将每个段的每个 xml 有效负载发送到目标。


                 {http}
                 page 1
                 /      \
       seg 1 >  page 2  -> merge -> wrapHeaderAndFooter -> http target 
          /      \      /
         /       page n
        / 
       /
batch -  seg 2                    "                     -> http target
       \ seg n                    "                     -> http target

val loadSegment: Flow[Segment, Response, NotUsed] = {
    Flow[Segment].mapAsync(parallelism = 5) { segment =>
      val pages: Source[ByteString, NotUsed] = pagedPayload(segment).map(page => page.payload)
//Using source concatenation to prepend and append
      val wrappedInXML: Source[ByteString, NotUsed] = xmlRootStartTag ++ pages ++ xmlRootEndTag
      val httpEntity: HttpEntity = HttpEntity(MediaTypes.`application/octet-stream`, pages)
        invokeTargetLoad(httpEntity, request, segment)
    }
  }
def pagedPayload(segment: Segment): Source[Payload, NotUsed] = {
    val totalPages: Int =   calculateTotalPages(segment.instanceCount)
      Source(0 until totalPages).mapAsyncUnordered(parallelism = 5)(i => {
        sendPayloadRequest(request, segment, i).mapTo[Try[Payload]].map(_.get)
      })
  }

val batch: Batch = someBatch
  Source(batch.segments)
    .via(loadSegment)
    .runWith(Sink.ignore)
    .andThen {
      case Success(value) => log("success")
      case Failure(error) => report(error)
    }

有更好的方法吗?我正在尝试使用HttpEntity.Chunked编码来流式传输页面。有时,由于预热,来自源的第一个请求可能需要更长的时间,并且目标会截断没有数据的流。有没有办法延迟与目标的实际连接,直到我们有第一页在流中?

我会更喜欢做类似下面的事情。如果有可能如何实现方法wrapXMLHeader & toHttpEntity

val splitPages: Flow[BuildSequenceSegment, Seq[PageRequest], NotUsed] = ???
  val requestPayload: Flow[Seq[PageRequest], Seq[PageResponse], NotUsed] = ???
  val wrapXMLHeader: Flow[Seq[PageResponse], Seq[PageResponse], NotUsed] = ???
  val toHttpEntity: Flow[Seq[PageResponse], HttpEntity.Chunked, NotUsed] = ???
  val invokeTargetLoad: Flow[HttpEntity.Chunked, RestResponse, NotUsed] = ???

  Source(batch.segments)
    .via(splitPages)
    .via(requestPayload)
    .via(wrapXMLHeader)
    .via(toHttpEntity)
    .via(invokeTargetLoad)
    .runWith(Sink.ignore)

标签: akka-streamakka-http

解决方案


推荐阅读