kotlin - 最好地实现对调用层透明地发生的批处理 api 请求的设计模式
问题描述
我有一个批处理器,我想重构它以根据输入以一对一的方式表达,以提高可读性,并在以后进行进一步优化。问题是有一个服务应该被批量调用以减少 HTTP 开销,因此将 1 对 1 代码与批处理代码混合有点棘手,我们可能不希望每次输入都调用该服务。结果可以一个一个急切地发出,但必须保持秩序,所以像流这样的东西似乎行不通。
所以,理想情况下,批处理器看起来像这样:
class Processor<A, B> {
val service: Service<A, B>
val scope: CoroutineScope
fun processBatch(input: List<A>) {
input.map {
Pair(it, scope.async { service.call(it) })
}.map {
(a, b) ->
runBlocking { b.await().let { /** handle result, do something with a if result is null, etc **/ } }
}
}
}
希望以一种在后台执行的方式执行所有服务逻辑,自动将服务的输入分成批处理,异步执行它们,并以某种方式将批处理调用的结果映射到挂起的调用。
这是一个 hacky 实现:
class Service<A, B> {
val inputContainer: MutableList<A>
val outputs: MutableList<B>
val runCalled = AtomicBoolean(false)
val batchSize: Int
suspended fun call(input: A): B? {
// some prefiltering logic that returns a null early
val index = inputContainer.size
inputContainer.add(a) // add to overall list for later batching
return suspend {
run()
outputs[index]
}
}
fun run() {
val batchOutputs = mutableListOf<Deferred<List<B?>>>()
if (!runCalled.getAndSet(true)) {
inputs.chunked(batchSize).forEach {
batchOutputs.add(scope.async { batchCall(it) })
}
runBlocking {
batchOutputs.map {
val res = result.await()
outputs.addAll(res)
}
}
}
}
suspended fun batchCall(input: List<A>): List<B?> {
// batch API call, etc
}
}
这样的事情可能会奏效,但有几个问题:
- 所有 API 调用都会立即发出。理想情况下,这将是在调度其他输入的同时在后台进行批处理和执行,但事实并非如此。
- 在返回所有结果之前,无法恢复对第一个输入的服务结果的处理。理想情况下,如果服务调用返回,我们可以处理结果,而其他结果继续在后台执行。
- 中间结果的容器看起来很hacky并且容易出现错误。还需要清理逻辑,这会在代码的其余部分中引入更多的 hacky 位
我可以考虑对地址 1 和 2 进行一些优化,但我认为与地址 3 相关的问题会更糟。这似乎是一种相当常见的调用模式,我希望有一个库或更简单的设计模式来实现这一点,但我找不到任何东西。任何指导表示赞赏。
解决方案
使用Deferred
. 我会使用的解决方案是:
- 当调用者发出请求时,创建一个
CompletableDeferred
- 使用通道,将其传递
CompletableDeferred
给服务以供稍后完成 - 让调用者挂起,直到服务完成
CompletableDeferred
它可能看起来像这样:
val requestChannel = Channel<Pair<Request, CompletableDeferred<Result>>()
suspend fun doRequest(request: Request): Result {
val result = CompletableDeferred<Result>()
requestChannel.send(Pair(request, result))
return result.await()
}
fun run() = scope.launch {
while(isActive) {
val (requests, deferreds) = getBatch(batchSize).unzip()
val results = batchCall(requests)
(results zip deferreds).forEach { (result, deferred) ->
deferred.complete(result)
}
}
}
suspend fun getBatch(batchSize: Int) = buildList {
repeat(batchSize) {
add(requestChannel.receive())
}
}
推荐阅读
- python - Keras中的词袋嵌入层?
- python - 如何在 Peewee 中使用历史记录表进行版本控制?
- c++ - 为什么不调用继承类的析构函数?
- flutter - 在颤动中获取弧的路径长度
- javascript - 2 个相等变量之间的比较在 Google 脚本(表格)中返回 false
- python - 在类内的默认参数中使用 functools.partial 时的令人惊讶的行为
- excel - 需要有关将 CSV 文件导出到自定义地图的帮助
- laravel - 如何在 laravel php 中从数据库中获取印地语文本,
- python - 如何从数据框中的字典列表中存储特定的键值
- postgresql - 尝试将我的 Elixir 应用程序连接到 VPN 上的 Postgres 银行时出现问题