akka-stream - 项目流,每个产生 n{ http request } > merge{response} > wrapInHeaderAndFooter{data} > http-request
问题描述
我正在尝试使用流媒体解决经典的 ETL 问题。我有一批段,每个段都包含有关与该段关联的记录的信息,例如记录数、要检索的 url 等,以发出 http 请求以收集数据。我需要从分页大小为 100 条记录的源中提取记录,合并每个段的记录页面,包装在 xml 页眉和页脚中。现在将每个段的每个 xml 有效负载发送到目标。
{http}
page 1
/ \
seg 1 > page 2 -> merge -> wrapHeaderAndFooter -> http target
/ \ /
/ page n
/
/
batch - seg 2 " -> http target
\ seg n " -> http target
val loadSegment: Flow[Segment, Response, NotUsed] = {
Flow[Segment].mapAsync(parallelism = 5) { segment =>
val pages: Source[ByteString, NotUsed] = pagedPayload(segment).map(page => page.payload)
//Using source concatenation to prepend and append
val wrappedInXML: Source[ByteString, NotUsed] = xmlRootStartTag ++ pages ++ xmlRootEndTag
val httpEntity: HttpEntity = HttpEntity(MediaTypes.`application/octet-stream`, pages)
invokeTargetLoad(httpEntity, request, segment)
}
}
def pagedPayload(segment: Segment): Source[Payload, NotUsed] = {
val totalPages: Int = calculateTotalPages(segment.instanceCount)
Source(0 until totalPages).mapAsyncUnordered(parallelism = 5)(i => {
sendPayloadRequest(request, segment, i).mapTo[Try[Payload]].map(_.get)
})
}
val batch: Batch = someBatch
Source(batch.segments)
.via(loadSegment)
.runWith(Sink.ignore)
.andThen {
case Success(value) => log("success")
case Failure(error) => report(error)
}
有更好的方法吗?我正在尝试使用HttpEntity.Chunked
编码来流式传输页面。有时,由于预热,来自源的第一个请求可能需要更长的时间,并且目标会截断没有数据的流。有没有办法延迟与目标的实际连接,直到我们有第一页在流中?
我会更喜欢做类似下面的事情。如果有可能如何实现方法wrapXMLHeader & toHttpEntity
val splitPages: Flow[BuildSequenceSegment, Seq[PageRequest], NotUsed] = ???
val requestPayload: Flow[Seq[PageRequest], Seq[PageResponse], NotUsed] = ???
val wrapXMLHeader: Flow[Seq[PageResponse], Seq[PageResponse], NotUsed] = ???
val toHttpEntity: Flow[Seq[PageResponse], HttpEntity.Chunked, NotUsed] = ???
val invokeTargetLoad: Flow[HttpEntity.Chunked, RestResponse, NotUsed] = ???
Source(batch.segments)
.via(splitPages)
.via(requestPayload)
.via(wrapXMLHeader)
.via(toHttpEntity)
.via(invokeTargetLoad)
.runWith(Sink.ignore)
解决方案
推荐阅读
- google-apps-script - 脚本帮助移动单元格值
- java - 如何在 Java Project Intellij 中将模块依赖添加到其他模块
- dataframe - 将 Spark DataFrame 转换为 spark.rdd.RDD[(Array[Integer], Array[Integer]) 以计算平均精度
- c# - 这个 SQL 数据表有什么解决方法吗?
- react-native - 通过视图反应原生导航
- python - 由于大量代码,Django 加载页面缓慢
- reactjs - 如何在reactjs中的react-multi-select-component中设置动态选择项
- javascript - 有没有办法从 Vue.js 的列表中只检查一个复选框
- html - 使用 string.replace 编辑后从 json 文件中删除反斜杠字符
- java - 如何在java中将HTTP请求头和正文作为字符串获取?