首页 > 解决方案 > 测试内部 Flux.interval

问题描述

我想使用 Reactor 的 Flux 提供一些数据。由于提供这些数据可能需要很长时间,所以我决定引入一种 ping 机制(例如,保持 tcp 连接处于活动状态而不发生超时)。这是我的简化解决方案:


public class Example {

    private final DataProvider dataProvider;

    public Example(DataProvider dataProvider) {
        this.dataProvider = dataProvider;
    }

    public Flux<String> getData() {
        AtomicBoolean inProgress = new AtomicBoolean(true);

        Flux<String> dataFlux = dataProvider.provide()
            .doFinally(ignoreIt -> inProgress.set(false));

        return dataFlux.mergeWith(ping(inProgress::get));
    }

    private Publisher<String> ping(Supplier<Boolean> inProgress) {
        return Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1))
                .map((tick) -> "ping " + tick)
                .takeWhile(ignoreIt -> inProgress.get());
    }

    interface DataProvider {
        Flux<String> provide();
    }

    public static void main(String[] args) {
        Callable<String> dataProviderLogic = () -> {
            Thread.sleep(3500);
            return "REAL DATA - SHOULD TERMINATE PING";
        };

        // wrapping synchronous call
        DataProvider dataProvider = () -> Mono.fromCallable(dataProviderLogic)
                .subscribeOn(Schedulers.boundedElastic())
                .flux();

        new Example(dataProvider).getData()
            .doOnNext(data -> System.out.println("GOT: " + data))
            .blockLast();
    }
}

上面的代码打印在控制台上:

GOT: ping 0
GOT: ping 1
GOT: ping 2
GOT: REAL DATA - SHOULD TERMINATE PING

所以它按预期工作。

问题是:我怎样才能在 Junit5 测试中测试这种 ping 机制,所以它不会花费很多时间(例如几秒钟)

在理想情况下,我想编写一个模拟数据提供延迟的测试,检查是否生成了预期的 ping 次数并验证是否发出了完整的信号(以确保 ping 通量按预期终止)。当然我想要一个单元测试,它可以在毫秒内运行。

我试过这个,但没有运气:

    @Test
    void test() {
        TestPublisher<String> publisher = TestPublisher.create();

        Flux<String> data = new Example(publisher::flux).getData();

        StepVerifier.withVirtualTime(() -> data)
                .thenAwait(Duration.ofMillis(3500))
                .then(() -> publisher.emit("REAL DATA - SHOULD TERMINATE PING"))
                .then(publisher::complete)
                .expectNextCount(4)
                .verifyComplete();
    }

上面的测试以这个错误结束:

java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onComplete())

是否可以将虚拟时间用于内部创建的 Flux.interval?

任何有关替代ping解决方案的想法将不胜感激。

标签: javaunit-testingproject-reactor

解决方案


尽管上面的 ping 机制不是最好的(我建议使用Sink代替AtomicBoolean和使用takeUntilOther代替takeWhile),但在我的情况下,问题可能与并非所有通量指令都用withVirtualTime. 此代码在上述情况下按预期工作:

    @Test
    void test() {
        StepVerifier.withVirtualTime(() -> {
            Flux<String> data = Flux.just("REAL DATA - SHOULD TERMINATE PING").delayElements(Duration.ofMillis(3200));
            return new Example(() -> data).getData();
        })
                .thenAwait(Duration.ofMillis(3500))
                .expectNextCount(4)
                .thenAwait(Duration.ofMillis(1000))
                .verifyComplete();
    }

推荐阅读