首页 > 解决方案 > 使用 Flux.merge() 的竞争条件

问题描述

我对反应式编程很陌生。我的项目基于 Spring WebFlux。

这是我的合并方法:

//fn1 and fn2 are JS functions with one int param, count - I execute them with param from 0 to count.  
public Flux<String> generateUnordered(String fn1, String fn2, int count) {
    Flux<String> fn1Flux = generateFunctionResultFlux(fn1, count, FUNCTION_1)
            .map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));

    Flux<String> fn2Flux = generateFunctionResultFlux(fn2, count, FUNCTION_2)
            .map(fnResult->String.format(UNORDERED_OUTPUT,fnResult[0],fnResult[1],fnResult[2],fnResult[3]));

    return Flux.merge(fn1Flux, fn2Flux)
            .delayElements(Duration.ofMillis(DELAY));
}

当我使用下面的 JS 函数和 count=2 执行它时

function fn1(number) {return number +100;}
function fn2(number) {return number +200;}

我得到类似的东西:

0,FUNCTION_2,200.0,0

0,FUNCTION_1,200.0,0

但是可以清楚地看到,我以某种方式在 fn1 行中得到了 fn2 结果!

我做错了什么,我该如何解决?

项目的 GitHub 链接:WebFlux 项目

类的 GitHub 链接:FluxGenerator 类

标签: spring-webfluxrace-conditionproject-reactor

解决方案


使用合并运算符源被热切地订阅。所以我猜第一个通量会在订阅后立即发出,第二个会发出。因为它们会立即发射。如果你想看到交错的通量,你需要延迟你的发射。

        Flux<Integer> fluxOfIntegers = Flux.merge(
      evenNumbers.delayElements(Duration.ofMillis(500L)), 
      oddNumbers.delayElements(Duration.ofMillis(300L)));
        
    StepVerifier.create(fluxOfIntegers)
      .expectNext(1)
      .expectNext(2)
      .expectNext(3)
      .expectNext(5)
      .expectNext(4)
      .expectComplete()
      .verify();

如果您想要更多运算符示例,可以查看


推荐阅读