project-reactor - 如何在 Reactor 3 中跨多个发布者排队工作?
问题描述
我正在创建一个库,用于使用 Reactor 3 创建数据处理工作流。每个任务都有一个输入通量和一个输出通量。输入通量由用户提供。输出通量由库创建。任务可以链接起来形成一个 DAG。像这样:(它在 Kotlin 中)
val base64 = task<String, String>("base64") {
input { Flux.just("a", "b", "c", "d", "e") }
outputFn { ... get the output values ... }
scriptFn { ... do some stuff ... }
}
val step2 = task<List<String>, String>("step2") {
input { base64.output.buffer(3) }
outputFn { ... }
scriptFn { ... }
}
我需要限制整个工作流程的并发性。一次只能处理配置数量的输入。在上面的示例中,限制为 3 这意味着任务 base64 将首先使用输入“a”、“b”和“c”运行,然后等待每个完成,然后再处理“d”、“e”和“第 2 步”任务。
从输入通量创建输出通量时,如何应用这些限制?可以以某种方式应用 TopicProcessor 吗?也许某种自定义调度程序或处理器?背压如何工作?我需要担心创建缓冲区吗?
解决方案
背压从最终的订阅者向上传播,跨越整个链。但是链中的操作员可以提前请求数据(预取)甚至“重写”请求。例如,buffer(3)
如果该操作员收到 a request(1)
,它将执行request(3)
上游(“1 个缓冲区 == 最多 3 个元素,因此我可以请求足够的源以填充我被请求的 1 个缓冲区”)。
如果输入总是由用户提供,这将很难抽象掉......
没有简单的方法来限制跨多个管道甚至多个订阅给定管道 (a Flux
) 的源。
使用Scheduler
多个共享publishOn
将不起作用,因为publishOn
选择一个Worker
线程并坚持它。
但是,如果您的问题更具体地是关于base64
任务受限,也许可以从flatMap
的并发参数中获得效果?
input.flatMap(someString -> asyncProcess(someString), 3, 1);
这将允许最多 3 次asyncProcess
运行,并且每次终止时,它都会从 的下一个值开始一个新的运行input
。
推荐阅读
- sql - 如何从表中选择记录并按 ID 以逗号分隔的字符串对其进行排序?
- javascript - 如何在 webview 的 java 代码中使用 javascript 值?
- php - 获取会话 cookie 不会在 Firefox 上发送
- php - 如何在 Symfony 表单的 CollectionType 的每个字段中添加 ChoiceType?
- json - 如何使用 Firebase 快照打印多个 json 对象数据 - 数据拉取成功
- asp.net - AddDbContextPool 不适用于 Azure SQL 的 Azure 广告身份验证
- android - 如何在android中的paing列表中获取排序的时间戳
- python - 如何向特定频道发送消息?
- flutter - 升级到 Android 嵌入 v2 后的 MissingPluginException
- angular - 如何防止在 Angular Firebase 应用程序中使用来自不同设备的相同凭据进行多次登录?