首页 > 解决方案 > 最好地实现对调用层透明地发生的批处理 api 请求的设计模式

问题描述

我有一个批处理器,我想重构它以根据输入以一对一的方式表达,以提高可读性,并在以后进行进一步优化。问题是有一个服务应该被批量调用以减少 HTTP 开销,因此将 1 对 1 代码与批处理代码混合有点棘手,我们可能不希望每次输入都调用该服务。结果可以一个一个急切地发出,但必须保持秩序,所以像流这样的东西似乎行不通。

所以,理想情况下,批处理器看起来像这样:

class Processor<A, B> {
    val service: Service<A, B>
    val scope: CoroutineScope
    fun processBatch(input: List<A>) {
        input.map {
            Pair(it, scope.async { service.call(it) })
        }.map {
            (a, b) ->
            runBlocking { b.await().let { /** handle result, do something with a if result is null, etc **/ } }
        }
    }
}

希望以一种在后台执行的方式执行所有服务逻辑,自动将服务的输入分成批处理,异步执行它们,并以某种方式将批处理调用的结果映射到挂起的调用。

这是一个 hacky 实现:

class Service<A, B> {
    val inputContainer: MutableList<A>
    val outputs: MutableList<B>
    val runCalled = AtomicBoolean(false)
    val batchSize: Int

    suspended fun call(input: A): B? {
        // some prefiltering logic that returns a null early
        val index = inputContainer.size
        inputContainer.add(a) // add to overall list for later batching
        return suspend {
            run()
            outputs[index]
        }
    }

   fun run() {
        val batchOutputs = mutableListOf<Deferred<List<B?>>>()
        if (!runCalled.getAndSet(true)) {
            inputs.chunked(batchSize).forEach {
                batchOutputs.add(scope.async { batchCall(it) })
            }
            runBlocking {
                batchOutputs.map {
                    val res = result.await()
                    outputs.addAll(res)
                }
            }
        } 

    }

    suspended fun batchCall(input: List<A>): List<B?> {
        // batch API call, etc
    }
}

这样的事情可能会奏效,但有几个问题:

  1. 所有 API 调用都会立即发出。理想情况下,这将是在调度其他输入的同时在后台进行批处理和执行,但事实并非如此。
  2. 在返回所有结果之前,无法恢复对第一个输入的服务结果的处理。理想情况下,如果服务调用返回,我们可以处理结果,而其他结果继续在后台执行。
  3. 中间结果的容器看起来很hacky并且容易出现错误。还需要清理逻辑,这会在代码的其余部分中引入更多的 hacky 位

我可以考虑对地址 1 和 2 进行一些优化,但我认为与地址 3 相关的问题会更糟。这似乎是一种相当常见的调用模式,我希望有一个库或更简单的设计模式来实现这一点,但我找不到任何东西。任何指导表示赞赏。

标签: kotlinbatch-processingkotlin-coroutines

解决方案


使用Deferred. 我会使用的解决方案是:

  1. 当调用者发出请求时,创建一个CompletableDeferred
  2. 使用通道,将其传递CompletableDeferred给服务以供稍后完成
  3. 让调用者挂起,直到服务完成CompletableDeferred

它可能看起来像这样:

val requestChannel = Channel<Pair<Request, CompletableDeferred<Result>>()

suspend fun doRequest(request: Request): Result {
    val result = CompletableDeferred<Result>()
    requestChannel.send(Pair(request, result))
    return result.await()
}

fun run() = scope.launch {
    while(isActive) {
        val (requests, deferreds) = getBatch(batchSize).unzip()
        val results = batchCall(requests)
        (results zip deferreds).forEach { (result, deferred) ->
            deferred.complete(result)
        }
    }
}

suspend fun getBatch(batchSize: Int) = buildList {
    repeat(batchSize) {
        add(requestChannel.receive())
    }
}

推荐阅读