首页 > 解决方案 > 如何在 Reactor 3 中跨多个发布者排队工作?

问题描述

我正在创建一个库,用于使用 Reactor 3 创建数据处理工作流。每个任务都有一个输入通量和一个输出通量。输入通量由用户提供。输出通量由库创建。任务可以链接起来形成一个 DAG。像这样:(它在 Kotlin 中)

val base64 = task<String, String>("base64") {
    input { Flux.just("a", "b", "c", "d", "e") }
    outputFn { ... get the output values ... }
    scriptFn { ... do some stuff ... }
}

val step2 = task<List<String>, String>("step2") {
    input { base64.output.buffer(3) }
    outputFn { ... }
    scriptFn { ... }
}

我需要限制整个工作流程的并发性。一次只能处理配置数量的输入。在上面的示例中,限制为 3 这意味着任务 base64 将首先使用输入“a”、“b”和“c”运行,然后等待每个完成,然后再处理“d”、“e”和“第 2 步”任务。

从输入通量创建输出通量时,如何应用这些限制?可以以某种方式应用 TopicProcessor 吗?也许某种自定义调度程序或处理器?背压如何工作?我需要担心创建缓冲区吗?

标签: project-reactor

解决方案


背压从最终的订阅者向上传播,跨越整个链。但是链中的操作员可以提前请求数据(预取)甚至“重写”请求。例如,buffer(3)如果该操作员收到 a request(1),它将执行request(3)上游(“1 个缓冲区 == 最多 3 个元素,因此我可以请求足够的源以填充我被请求的 1 个缓冲区”)。

如果输入总是由用户提供,这将很难抽象掉......

没有简单的方法来限制跨多个管道甚至多个订阅给定管道 (a Flux) 的源。

使用Scheduler多个共享publishOn将不起作用,因为publishOn选择一个Worker线程并坚持它。

但是,如果您的问题更具体地是关于base64任务受限,也许可以从flatMap的并发参数中获得效果?

input.flatMap(someString -> asyncProcess(someString), 3, 1);

这将允许最多 3 次asyncProcess运行,并且每次终止时,它都会从 的下一个值开始一个新的运行input


推荐阅读