首页 > 解决方案 > 如何确保具有无限流的 Flux 在给定时间内完成?

问题描述

我有一个Flux生成器(来自项目反应器)用于延迟 1 秒以发射每个元素的流。而且由于流是无限的,我还决定使用该take()方法要采用多少元素。我想测试这样的方法是否可以在给定的时间内执行。我试图创建受此答案启发的解决方案,但没有成功。

这是Flux方法

import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
    public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec))
                .map(l -> l.doubleValue())
                .take(numberOfElements)
                .log();
    }
}

这是单元测试。在. _ .thenConsumeWhile(value -> true)_ .expectNextCount(4)_ 但它们都不起作用。.expectNext(0.0, 1.0, 2.0, 3.0).thenAwait(Duration.ofSeconds(4))

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
    FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
    @Test
    void testCreateFluxStreamWithMapAndVerifyDelay() {
        Flux<Double> streamLongFlux = myFluxAndMonoStreams
                .createFluxDoubleWithDelay(4, 1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                // .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }
}

我收到这个错误,说我没有使用值1.02.0...基本上我想确保我只0.0, 1.0, 2.0, 3.0在 4 秒内使用值。我怎么做?

12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()

expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))

附加说明:只是为了再澄清一次我想要的行为。如果我取消注释 .expectNext(0.0, 1.0, 2.0, 3.0)并将 4 seconds 替换为 2 seconds .thenAwait(Duration.ofSeconds(2)),则测试应该失败。但事实并非如此。

标签: javaspring-webfluxproject-reactor

解决方案


如果我取消注释expectNext,这个测试对我来说运行“很好”。

    public Flux<Double> create(long numberOfElements, long delaySec) {
        return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
    }

    @Test
    public void virtualTimeTest() {
        Flux<Double> streamLongFlux = create(4,1);

        Scheduler scheduler = Schedulers.newSingle("test");
        AtomicInteger incrementer = new AtomicInteger();

        StepVerifier
                .withVirtualTime(() -> streamLongFlux
                        .subscribeOn(scheduler)
                        .doOnNext(value -> incrementer.incrementAndGet())
                )
                .expectSubscription()
                .thenAwait(Duration.ofSeconds(4))
                // .thenConsumeWhile(value -> true)
                // .expectNextCount(4)
                 .expectNext(0.0, 1.0, 2.0, 3.0)
                .verifyComplete()
        ;
    }

它不在虚拟时间中运行,因为在建立虚拟时间之前调用了 Flux.interval。

 StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .thenAwait(Duration.ofSeconds(4))
                    .expectNext(0.0, 1.0, 2.0, 3.0)
                    .verifyComplete();

这确实在虚拟时间内运行,需要 400 毫秒而不是 4.4 秒

另一个更明确的测试是:

@Test
    public void virtualTime(){
        StepVerifier.withVirtualTime(() -> create(4,1))
                    .expectSubscription()
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(0.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(1.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(2.0)
                    .expectNoEvent(Duration.ofSeconds(1))
                    .expectNext(3.0)
                    .verifyComplete();
    }

推荐阅读