首页 > 解决方案 > 在 ParallelFlux 上使用线程睡眠时,它不等待睡眠线程完成并执行 onComplete() 函数

问题描述

  1. 在 Flux 上使用 Parallel 时,我正在使用线程睡眠停止线程一段时间,但问题是 Flux 没有等到线程睡眠时间并在订阅时在 onComplete 上执行。

    列表 str = new ArrayList<>(); str.add("弹簧"); str.add("webflux"); str.add("示例");

                AtomicInteger num = new AtomicInteger();
    
                ParallelFlux<Object> names = Flux.fromIterable(str)
                        .log()
                        .parallel(2)
                        .runOn(Schedulers.boundedElastic())
                        .map( s-> {
    
                            if(s.equalsIgnoreCase("webflux")) {
                                try {
                                    System.out.println("waiting...");
                                    Thread.sleep(1000);
                                    System.out.println("done...");
                                } catch (InterruptedException e) {
                                    // TODO Auto-generated catch block
                                    e.printStackTrace();
                                }
                            }
                            return s+" "+num.incrementAndGet();
    
                        });
    
                names.subscribe(s -> {
                    System.out.println("value "+s+" thread : "+Thread.currentThread().getName());
                });
    

输出:

    19:35:24.870 [main] INFO reactor.Flux.Iterable.1 - | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
19:35:24.896 [main] INFO reactor.Flux.Iterable.1 - | request(256)
19:35:24.897 [main] INFO reactor.Flux.Iterable.1 - | onNext(spring)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(webflux)
19:35:24.898 [main] INFO reactor.Flux.Iterable.1 - | onNext(example)
waiting...
value spring 1 thread : boundedElastic-1
value example 2 thread : boundedElastic-1
19:35:24.899 [main] INFO reactor.Flux.Iterable.1 - | onComplete()

标签: webclientspring-webfluxfluxspring-webclient

解决方案


推荐阅读