我正在尝试并行化 Flux 的过滤器操作。但是,从完成操作所花费的时间来看,它似乎并没有并行化。对我在这里可能做错的任何见解将不胜感激。谢谢。

    public void testParallelFilteringFlux() {
        long start = Calendar.getInstance().getTimeInMillis();
        log.info("Start time ::{}",Calendar.getInstance().getTimeInMillis());
        Flux<Integer> fluxFromJust = Flux.range(1, 1000000);
        ParallelFlux<Integer> pfilter = fluxFromJust.filter(i -> i == 99999).parallel(4).runOn(Schedulers.parallel());//filter the even numbers only
        Flux<Integer> filter = fluxFromJust.filter(i -> i == 99999);
        filter.subscribe(i->log.info(">>>>>>>>> Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
        pfilter.subscribe(i->log.info(">>>>>>>>> Parallel Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));


20:37:29.733 [main] INFO test.ReactorTest - Start time ::1614092849730
20:37:30.040 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:37:30.107 [main] INFO test.ReactorTest - >>>>>>>>> Found Integer: 99999, time: 377
20:37:30.190 [parallel-1] INFO test.ReactorTest - >>>>>>>>> Parallel Found Integer: 99999, time: 460

  • 2 个进程(1 个单线程,1 个并行)并行执行。如果你想让你的结果更精确,你应该一个接一个地运行
  • 你应该至少执行2次测试,因为第一次有几个东西还没有初始化,所以时间考虑到类加载,调度器初始化等......我们在比较2个解决方案时不应该考虑它们.



  • 我运行了 2 次测试以避免在计时中考虑初始化
  • 我在启动并行测试之前等待单线程测试完成,因此它们不会干扰
  • 最后,我使用一个非常简单的谓词(如您的测试)和一个更长的谓词进行测试(我只是放了一个睡眠来模拟更长的处理)。请注意,我减少了较长谓词中的项目数以更快地获得结果。


    public void testParallelFilteringFlux() throws Exception {
        Predicate<Integer> predicate;
        System.out.println("Test with short process in the predicate");
        final int nb1 = 1000000;
        predicate = i -> i == nb1 - 1;
        runTest(nb1, predicate);
        runTest(nb1, predicate);
        System.out.println("Test with longer process in the predicate");
        final int nb2 = 10000;
        predicate = i -> {
            try {
            } catch (InterruptedException e) {
                // ignore
            return i == nb2 - 1;
        runTest(nb2, predicate);
        runTest(nb2, predicate);
    private void runTest(int nb, Predicate<Integer> predicate) {
        long start = System.currentTimeMillis();
        List<Integer> result = testSingleThread(nb, predicate).collectList().block();
        System.out.println("Found with single thread " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
        start = System.currentTimeMillis();
        result = testParallel(nb, predicate).collectList().block();
        System.out.println("Found with parallel " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
    private Flux<Integer> testSingleThread(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        Flux<Integer> filter = fluxFromJust.filter(predicate);
        return filter;
    private Flux<Integer> testParallel(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        ParallelFlux<Integer> pfilter = fluxFromJust.parallel(4).runOn(Schedulers.parallel()).filter(predicate);
        return pfilter.sequential();


Test with short process in the predicate
Found with single thread [999999] in 126ms.
Found with parallel [999999] in 326ms.
Found with single thread [999999] in 6ms.
Found with parallel [999999] in 191ms.
Test with longer process in the predicate
Found with single thread [9999] in 17474ms.
Found with parallel [9999] in 4528ms.
Found with single thread [9999] in 17575ms.
Found with parallel [9999] in 4563ms.

可以看到,使用较短的谓词,单线程测试更快,但如果谓词的处理时间较长,则时间几乎被 4 整除。
