reactive-programming - 并行化过滤操作
问题描述
我正在尝试并行化 Flux 的过滤器操作。但是,从完成操作所花费的时间来看,它似乎并没有并行化。对我在这里可能做错的任何见解将不胜感激。谢谢。
@Test
public void testParallelFilteringFlux() {
long start = Calendar.getInstance().getTimeInMillis();
log.info("Start time ::{}",Calendar.getInstance().getTimeInMillis());
Flux<Integer> fluxFromJust = Flux.range(1, 1000000);
ParallelFlux<Integer> pfilter = fluxFromJust.filter(i -> i == 99999).parallel(4).runOn(Schedulers.parallel());//filter the even numbers only
Flux<Integer> filter = fluxFromJust.filter(i -> i == 99999);
filter.subscribe(i->log.info(">>>>>>>>> Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
pfilter.subscribe(i->log.info(">>>>>>>>> Parallel Found Integer: {}, time: {}",i, Calendar.getInstance().getTimeInMillis() - start));
}
输出是:
20:37:29.733 [main] INFO test.ReactorTest - Start time ::1614092849730
20:37:30.040 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:37:30.107 [main] INFO test.ReactorTest - >>>>>>>>> Found Integer: 99999, time: 377
20:37:30.190 [parallel-1] INFO test.ReactorTest - >>>>>>>>> Parallel Found Integer: 99999, time: 460
Process finished with exit code
解决方案
它是并行完成的,但是有几点可以解释为什么在您的测试中使用并行的时间更长,我将尝试解释。
首先,您的测试并不准确,因为:
- 2 个进程(1 个单线程,1 个并行)并行执行。如果你想让你的结果更精确,你应该一个接一个地运行
- 你应该至少执行2次测试,因为第一次有几个东西还没有初始化,所以时间考虑到类加载,调度器初始化等......我们在比较2个解决方案时不应该考虑它们.
但这不是最重要的一点。实际上并行处理过滤器需要在幕后做更多的工作,以拆分数据并将其分派到不同的线程。因此,因为过滤器中的谓词非常简单(只是一个比较),所以最后在单个线程中一次性完成它比以并行方式执行它更有效。如果过滤器的处理时间更重要,并行方式将变得更有效,因为这个处理时间将(或多或少)除以并行线程的数量。
我重写你的测试来说明这些观点:
- 我运行了 2 次测试以避免在计时中考虑初始化
- 我在启动并行测试之前等待单线程测试完成,因此它们不会干扰
- 最后,我使用一个非常简单的谓词(如您的测试)和一个更长的谓词进行测试(我只是放了一个睡眠来模拟更长的处理)。请注意,我减少了较长谓词中的项目数以更快地获得结果。
这是代码:
@Test
public void testParallelFilteringFlux() throws Exception {
Predicate<Integer> predicate;
System.out.println("Test with short process in the predicate");
final int nb1 = 1000000;
predicate = i -> i == nb1 - 1;
runTest(nb1, predicate);
runTest(nb1, predicate);
System.out.println("Test with longer process in the predicate");
final int nb2 = 10000;
predicate = i -> {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
// ignore
}
return i == nb2 - 1;
};
runTest(nb2, predicate);
runTest(nb2, predicate);
}
private void runTest(int nb, Predicate<Integer> predicate) {
long start = System.currentTimeMillis();
List<Integer> result = testSingleThread(nb, predicate).collectList().block();
System.out.println("Found with single thread " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
start = System.currentTimeMillis();
result = testParallel(nb, predicate).collectList().block();
System.out.println("Found with parallel " + result + " in " + (System.currentTimeMillis() - start) + "ms.");
}
private Flux<Integer> testSingleThread(int nb, Predicate<Integer> predicate) {
Flux<Integer> fluxFromJust = Flux.range(1, nb);
Flux<Integer> filter = fluxFromJust.filter(predicate);
return filter;
}
private Flux<Integer> testParallel(int nb, Predicate<Integer> predicate) {
Flux<Integer> fluxFromJust = Flux.range(1, nb);
ParallelFlux<Integer> pfilter = fluxFromJust.parallel(4).runOn(Schedulers.parallel()).filter(predicate);
return pfilter.sequential();
}
这是输出:
Test with short process in the predicate
Found with single thread [999999] in 126ms.
Found with parallel [999999] in 326ms.
Found with single thread [999999] in 6ms.
Found with parallel [999999] in 191ms.
Test with longer process in the predicate
Found with single thread [9999] in 17474ms.
Found with parallel [9999] in 4528ms.
Found with single thread [9999] in 17575ms.
Found with parallel [9999] in 4563ms.
可以看到,使用较短的谓词,单线程测试更快,但如果谓词的处理时间较长,则时间几乎被 4 整除。
推荐阅读
- reactjs - 无效的钩子调用 React
- javascript - 你好,我怎样才能找到这个阵列中最大的平均乘客和有它的公司?
- php - FilesystemManager.php 第 121 行中的 Laravel 5.2 InvalidArgumentException:不支持驱动程序 []
- c++ - 介子添加现有的dll作为依赖
- java - java类型转换重要吗?在初学者阶段是否一定需要知道?
- pl-i - 如何获取字符串的当前剩余长度?
- spring - 如何修复表单验证,当它不起作用时
- gtk - GTK:如何在顶部窗口边框上获取按钮释放事件
- c++ - (C++ 和 OpenGL)我正在尝试在批处理渲染器中旋转一组顶点(它将模拟一个正方形),但它不是 100% 工作:(
- gcc - 多架构构建系统