首页 > 解决方案 > Reactor Sinks:如何控制 onNext() 信号的调用?

问题描述

给定一个重放接收器,是否可以确保在继续执行之前成功向下游发出 onNext() 信号?

显示此问题的一个简单示例是:

    @Test
    void sinksEmitDelayTest() throws InterruptedException {
        Sinks.Many<State> stateSink = Sinks.many().replay().limit(1);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        stateSink
                .asFlux()
                .switchMap(state -> Mono
                        .delay(Duration.ofMillis(1000))
                        .flatMap(i -> {
                            if (state.on)
                                return Mono.error(new Throwable("State must be off"));
                            else
                                return Mono.just(state);
                        })
                )
                .subscribe(state -> countDownLatch.countDown());

        State state = new State();
        if (stateSink.tryEmitNext(state) == Sinks.EmitResult.OK)
            state.on = true;
        else
            throw new RuntimeException("failed to emit");

        Assertions.assertTrue(countDownLatch.await(2000, TimeUnit.MILLISECONDS));
    }

这里的问题是 ,tryEmitNext实际上并没有调用onNext,而是在 中添加了一个新NodeReplayBuffer,因此稍后调用onNext,因此允许在 能够对状态起作用之前切换on到。trueswitchMap

注意:测试甚至没有延迟就失败了。延迟只是为了确保它始终失败。

注意: 可能有任意多的订阅者,在切换到stateSink时不一定知道。ontrue

编辑:

对于实际的静态情况,一种解决方法如下,但是,感觉并不好,看起来也不是真正的线程安全,如果在for循环等待时添加新订阅者,很可能会失败:

    @Test
    void sinksEmitDelayTest() throws InterruptedException {
        Sinks.Many<State> stateSink = Sinks.many().replay().limit(1);

        CountDownLatch countDownLatch = new CountDownLatch(1);

        AtomicInteger subscriptionControl = new AtomicInteger(0);

        stateSink
                .asFlux()
                .switchMap(state -> Mono
                        .delay(Duration.ofMillis(10))
                        .flatMap(i -> {
                            if (state.on)
                                return Mono.error(new Throwable("State must be off"));
                            else
                                return Mono.just(state);
                        })
                        .doOnNext(next -> subscriptionControl.decrementAndGet())
                )
                .subscribe(state -> countDownLatch.countDown());

        State state = new State();
        subscriptionControl.set(stateSink.currentSubscriberCount());
        if (stateSink.tryEmitNext(state) == Sinks.EmitResult.OK)
            for (; ; ) {
                if (subscriptionControl.get() == 0) {
                    state.on = true;
                    break;
                }
            }
        else
            throw new RuntimeException("failed to emit");

        Assertions.assertTrue(countDownLatch.await(2000, TimeUnit.MILLISECONDS));
    }

标签: project-reactor

解决方案


推荐阅读