首页 > 解决方案 > 并行化过滤操作

问题描述

我正在尝试并行化 Flux 的过滤器操作。但是,从完成操作所花费的时间来看,它似乎并没有并行化。对我在这里可能做错的任何见解将不胜感激。谢谢。

@Test
    public void testParallelFilteringFlux() {
        long start = Calendar.getInstance().getTimeInMillis();
        log.info("Start time ::{}",Calendar.getInstance().getTimeInMillis());
        Flux<Integer> fluxFromJust = Flux.range(1, 1000000);
        ParallelFlux<Integer> pfilter = fluxFromJust.filter(i -> i == 99999).parallel(4).runOn(Schedulers.parallel());//filter the even numbers only
        Flux<Integer> filter = fluxFromJust.filter(i -> i == 99999);
        filter.subscribe(i->log.info(">>>>>>>>> Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
        pfilter.subscribe(i->log.info(">>>>>>>>> Parallel Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
    }

输出是:

20:37:29.733 [main] INFO test.ReactorTest - Start time ::1614092849730
20:37:30.040 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:37:30.107 [main] INFO test.ReactorTest - >>>>>>>>> Found Integer: 99999, time: 377
20:37:30.190 [parallel-1] INFO test.ReactorTest - >>>>>>>>> Parallel Found Integer: 99999, time: 460

Process finished with exit code 

标签: reactive-programmingproject-reactor

解决方案


它是并行完成的,但是有几点可以解释为什么在您的测试中使用并行的时间更长,我将尝试解释。

首先,您的测试并不准确,因为:

  • 2 个进程(1 个单线程,1 个并行)并行执行。如果你想让你的结果更精确,你应该一个接一个地运行
  • 你应该至少执行2次测试,因为第一次有几个东西还没有初始化,所以时间考虑到类加载,调度器初始化等......我们在比较2个解决方案时不应该考虑它们.

但这不是最重要的一点。实际上并行处理过滤器需要在幕后做更多的工作,以拆分数据并将其分派到不同的线程。因此,因为过滤器中的谓词非常简单(只是一个比较),所以最后在单个线程中一次性完成它比以并行方式执行它更有效。如果过滤器的处理时间更重要,并行方式将变得更有效,因为这个处理时间将(或多或少)除以并行线程的数量。

我重写你的测试来说明这些观点:

  • 我运行了 2 次测试以避免在计时中考虑初始化
  • 我在启动并行测试之前等待单线程测试完成,因此它们不会干扰
  • 最后,我使用一个非常简单的谓词(如您的测试)和一个更长的谓词进行测试(我只是放了一个睡眠来模拟更长的处理)。请注意,我减少了较长谓词中的项目数以更快地获得结果。

这是代码:

    @Test
    public void testParallelFilteringFlux() throws Exception {
        Predicate<Integer> predicate;
        System.out.println("Test with short process in the predicate");
        final int nb1 = 1000000;
        predicate = i -> i == nb1 - 1;
        runTest(nb1, predicate);
        runTest(nb1, predicate);
        System.out.println("Test with longer process in the predicate");
        final int nb2 = 10000;
        predicate = i -> {
            try {
                Thread.sleep(1);
            } catch (InterruptedException e) {
                // ignore
            }
            return i == nb2 - 1;
        };
        runTest(nb2, predicate);
        runTest(nb2, predicate);
    }
    
    private void runTest(int nb, Predicate<Integer> predicate) {
        long start = System.currentTimeMillis();
        List<Integer> result = testSingleThread(nb, predicate).collectList().block();
        System.out.println("Found with single thread " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
        
        start = System.currentTimeMillis();
        result = testParallel(nb, predicate).collectList().block();
        System.out.println("Found with parallel " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
    }
    
    private Flux<Integer> testSingleThread(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        Flux<Integer> filter = fluxFromJust.filter(predicate);
        return filter;
    }
    
    private Flux<Integer> testParallel(int nb, Predicate<Integer> predicate) {
        Flux<Integer> fluxFromJust = Flux.range(1, nb);
        ParallelFlux<Integer> pfilter = fluxFromJust.parallel(4).runOn(Schedulers.parallel()).filter(predicate);
        return pfilter.sequential();
    }

这是输出:

Test with short process in the predicate
Found with single thread [999999] in 126ms.
Found with parallel [999999] in 326ms.
Found with single thread [999999] in 6ms.
Found with parallel [999999] in 191ms.
Test with longer process in the predicate
Found with single thread [9999] in 17474ms.
Found with parallel [9999] in 4528ms.
Found with single thread [9999] in 17575ms.
Found with parallel [9999] in 4563ms.

可以看到,使用较短的谓词,单线程测试更快,但如果谓词的处理时间较长,则时间几乎被 4 整除。


推荐阅读