spring-webflux - 使用 Flux.merge() 的竞争条件
问题描述
我对反应式编程很陌生。我的项目基于 Spring WebFlux。
这是我的合并方法:
//fn1 and fn2 are JS functions with one int param, count - I execute them with param from 0 to count.
public Flux<String> generateUnordered(String fn1, String fn2, int count) {
Flux<String> fn1Flux = generateFunctionResultFlux(fn1, count, FUNCTION_1)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
Flux<String> fn2Flux = generateFunctionResultFlux(fn2, count, FUNCTION_2)
.map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));
return Flux.merge(fn1Flux, fn2Flux)
.delayElements(Duration.ofMillis(DELAY));
}
当我使用下面的 JS 函数和 count=2 执行它时
function fn1(number) {return number +100;}
function fn2(number) {return number +200;}
我得到类似的东西:
0,FUNCTION_2,200.0,0
0,FUNCTION_1,200.0,0
但是可以清楚地看到,我以某种方式在 fn1 行中得到了 fn2 结果!
我做错了什么,我该如何解决?
项目的 GitHub 链接:WebFlux 项目
类的 GitHub 链接:FluxGenerator 类
解决方案
使用合并运算符源被热切地订阅。所以我猜第一个通量会在订阅后立即发出,第二个会发出。因为它们会立即发射。如果你想看到交错的通量,你需要延迟你的发射。
Flux<Integer> fluxOfIntegers = Flux.merge(
evenNumbers.delayElements(Duration.ofMillis(500L)),
oddNumbers.delayElements(Duration.ofMillis(300L)));
StepVerifier.create(fluxOfIntegers)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(5)
.expectNext(4)
.expectComplete()
.verify();
如果您想要更多运算符示例,可以查看此
推荐阅读
- node.js - firebase 管理代码完成在 vscode 中不起作用
- xml-parsing - Pentaho 数据集成勺 XML 解析
- php - 如何从 laravel 5.5 的存储内部文件夹中获取文件?
- c# - 图像之间的平等c#
- javascript - oauth 中的会话管理位置启用 Web 应用程序?
- javascript - 无法识别阵列
- android - 支持库 v7 中的 EditTextPreference 因 NullPointerException 而崩溃
- javascript - 如何将 JQuery Validation 与 JQuery 多选插件一起使用
- java - 使用 Java 进行数据迁移
- llvm - LLVM 6.0.0 HowToUseJit 示例程序段错误