首页 > 解决方案 > Project Reactor:调度程序#parallel & Schedulers#elastic purpose

问题描述

我正在学习我正在探索的Project ReactorSchedulers factory

我尝试了以下代码:

    ExecutorService executorService = Executors.newFixedThreadPool(10);
    Flux.range(1,4)
                    .map(i -> {
                        logger.info(i +" [MAP] " + Thread.currentThread().getName());
                        return 10 / i;
                    })
                    .publishOn(Schedulers.fromExecutorService(executorService)) // .publishOn(Schedulers.parallel())
                    .subscribe(
                            n -> {
                                logger.info("START "+((Long)(System.currentTimeMillis() % 10000000L)).toString());
                                try {
                                    Thread.sleep(100);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                logger.info(n.toString());
                                logger.info("END "+((Long)(System.currentTimeMillis() % 10000000L)).toString());
                            }
                    );
        executorService.shutdown();

此代码也与Schedulers.parallel()Schedulers.elastic()一起尝试过。此外,尝试与subscribeOn()操作员一起查看类似的结果。

日志是:

02:07:30.142 [main] INFO  - 1 [MAP] main
02:07:30.143 [main] INFO  - 2 [MAP] main
02:07:30.143 [main] INFO  - 3 [MAP] main
02:07:30.143 [main] INFO  - 4 [MAP] main
02:07:30.143 [pool-1-thread-2] INFO  - START 1050143
02:07:30.247 [pool-1-thread-2] INFO  - 10
02:07:30.247 [pool-1-thread-2] INFO  - END 1050247
02:07:30.247 [pool-1-thread-2] INFO  - START 1050247
02:07:30.350 [pool-1-thread-2] INFO  - 5
02:07:30.350 [pool-1-thread-2] INFO  - END 1050350
02:07:30.350 [pool-1-thread-2] INFO  - START 1050350
02:07:30.455 [pool-1-thread-2] INFO  - 3
02:07:30.455 [pool-1-thread-2] INFO  - END 1050455
02:07:30.455 [pool-1-thread-2] INFO  - START 1050455
02:07:30.557 [pool-1-thread-2] INFO  - 2
02:07:30.558 [pool-1-thread-2] INFO  - END 1050558

由于Flux's 元素是按顺序排序和操作的(从上面的日志中可以看出),一个元素的operator(或operator chain)具有多个线程是没有意义的。我敢肯定,我要么误解了,要么Schedulers在我的基本理解中缺乏某些地方。有人可以指出我正确的方向吗?

我了解Schedulers进行处理asynchronous和取消保留main线程的目的。operator(s)但是,为什么有人要在一次对一个元素进行操作时给多个线程。

只有当我们处理时才有意义flatMap operator吗?

标签: javamultithreadingproject-reactor

解决方案


推荐阅读