java - Project Reactor:调度程序#parallel & Schedulers#elastic purpose
问题描述
我正在学习我正在探索的Project ReactorSchedulers factory
。
我尝试了以下代码:
ExecutorService executorService = Executors.newFixedThreadPool(10);
Flux.range(1,4)
.map(i -> {
logger.info(i +" [MAP] " + Thread.currentThread().getName());
return 10 / i;
})
.publishOn(Schedulers.fromExecutorService(executorService)) // .publishOn(Schedulers.parallel())
.subscribe(
n -> {
logger.info("START "+((Long)(System.currentTimeMillis() % 10000000L)).toString());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
logger.info(n.toString());
logger.info("END "+((Long)(System.currentTimeMillis() % 10000000L)).toString());
}
);
executorService.shutdown();
此代码也与Schedulers.parallel()和Schedulers.elastic()一起尝试过。此外,尝试与subscribeOn()
操作员一起查看类似的结果。
日志是:
02:07:30.142 [main] INFO - 1 [MAP] main
02:07:30.143 [main] INFO - 2 [MAP] main
02:07:30.143 [main] INFO - 3 [MAP] main
02:07:30.143 [main] INFO - 4 [MAP] main
02:07:30.143 [pool-1-thread-2] INFO - START 1050143
02:07:30.247 [pool-1-thread-2] INFO - 10
02:07:30.247 [pool-1-thread-2] INFO - END 1050247
02:07:30.247 [pool-1-thread-2] INFO - START 1050247
02:07:30.350 [pool-1-thread-2] INFO - 5
02:07:30.350 [pool-1-thread-2] INFO - END 1050350
02:07:30.350 [pool-1-thread-2] INFO - START 1050350
02:07:30.455 [pool-1-thread-2] INFO - 3
02:07:30.455 [pool-1-thread-2] INFO - END 1050455
02:07:30.455 [pool-1-thread-2] INFO - START 1050455
02:07:30.557 [pool-1-thread-2] INFO - 2
02:07:30.558 [pool-1-thread-2] INFO - END 1050558
由于Flux
's 元素是按顺序排序和操作的(从上面的日志中可以看出),一个元素的operator
(或operator chain
)具有多个线程是没有意义的。我敢肯定,我要么误解了,要么Schedulers
在我的基本理解中缺乏某些地方。有人可以指出我正确的方向吗?
我了解Schedulers
进行处理asynchronous
和取消保留main
线程的目的。operator(s)
但是,为什么有人要在一次对一个元素进行操作时给多个线程。
只有当我们处理时才有意义flatMap
operator
吗?
解决方案
推荐阅读
- vim - 在vim中以可视模式选择多行
- php - 从 AJAX POST 到 PHP 的未定义索引?
- javascript - 额外的 div 容器打破了 html5 视频播放功能
- debian - W:无法获取 http://httpredir.debian.org/debian/dists/jessie-updates/main/binary-amd64/Packages 404 Not Found
- python - 如何将行值与不同列中的所有行进行比较并使用 Pandas 分隔所有匹配的行
- sql - 在 sql server 中使用虚拟数据在查询结果集中添加额外的行
- api - 在 react-native snap carousel 中从 api 获取数据但无法正常工作
- python - 检查 pandas 列的连续行值
- sas - 当一个目标有多个数据时,如何进行因子分析?
- django-models - Django:以 django 形式管理多对多字段的最佳方法是什么?