java - 测试内部 Flux.interval
问题描述
我想使用 Reactor 的 Flux 提供一些数据。由于提供这些数据可能需要很长时间,所以我决定引入一种 ping 机制(例如,保持 tcp 连接处于活动状态而不发生超时)。这是我的简化解决方案:
public class Example {
private final DataProvider dataProvider;
public Example(DataProvider dataProvider) {
this.dataProvider = dataProvider;
}
public Flux<String> getData() {
AtomicBoolean inProgress = new AtomicBoolean(true);
Flux<String> dataFlux = dataProvider.provide()
.doFinally(ignoreIt -> inProgress.set(false));
return dataFlux.mergeWith(ping(inProgress::get));
}
private Publisher<String> ping(Supplier<Boolean> inProgress) {
return Flux.interval(Duration.ofSeconds(1), Duration.ofSeconds(1))
.map((tick) -> "ping " + tick)
.takeWhile(ignoreIt -> inProgress.get());
}
interface DataProvider {
Flux<String> provide();
}
public static void main(String[] args) {
Callable<String> dataProviderLogic = () -> {
Thread.sleep(3500);
return "REAL DATA - SHOULD TERMINATE PING";
};
// wrapping synchronous call
DataProvider dataProvider = () -> Mono.fromCallable(dataProviderLogic)
.subscribeOn(Schedulers.boundedElastic())
.flux();
new Example(dataProvider).getData()
.doOnNext(data -> System.out.println("GOT: " + data))
.blockLast();
}
}
上面的代码打印在控制台上:
GOT: ping 0
GOT: ping 1
GOT: ping 2
GOT: REAL DATA - SHOULD TERMINATE PING
所以它按预期工作。
问题是:我怎样才能在 Junit5 测试中测试这种 ping 机制,所以它不会花费很多时间(例如几秒钟)?
在理想情况下,我想编写一个模拟数据提供延迟的测试,检查是否生成了预期的 ping 次数并验证是否发出了完整的信号(以确保 ping 通量按预期终止)。当然我想要一个单元测试,它可以在毫秒内运行。
我试过这个,但没有运气:
@Test
void test() {
TestPublisher<String> publisher = TestPublisher.create();
Flux<String> data = new Example(publisher::flux).getData();
StepVerifier.withVirtualTime(() -> data)
.thenAwait(Duration.ofMillis(3500))
.then(() -> publisher.emit("REAL DATA - SHOULD TERMINATE PING"))
.then(publisher::complete)
.expectNextCount(4)
.verifyComplete();
}
上面的测试以这个错误结束:
java.lang.AssertionError: expectation "expectNextCount(4)" failed (expected: count = 4; actual: counted = 1; signal: onComplete())
是否可以将虚拟时间用于内部创建的 Flux.interval?
任何有关替代ping解决方案的想法将不胜感激。
解决方案
尽管上面的 ping 机制不是最好的(我建议使用Sink
代替AtomicBoolean
和使用takeUntilOther
代替takeWhile
),但在我的情况下,问题可能与并非所有通量指令都用withVirtualTime
. 此代码在上述情况下按预期工作:
@Test
void test() {
StepVerifier.withVirtualTime(() -> {
Flux<String> data = Flux.just("REAL DATA - SHOULD TERMINATE PING").delayElements(Duration.ofMillis(3200));
return new Example(() -> data).getData();
})
.thenAwait(Duration.ofMillis(3500))
.expectNextCount(4)
.thenAwait(Duration.ofMillis(1000))
.verifyComplete();
}
推荐阅读
- java - 为什么我们需要装饰器模式中的抽象装饰器类?
- gcc - Rstudio 无法安装包 devtools:gcc 错误
- kotlin - 如何通过反射获取 Kotlin Collection<*> 信息?
- javascript - 根据属性将数组拆分为不同大小的块
- c# - c#使用组合框将文件夹从C盘复制到可移动驱动器
- c# - 传递给 HttpContext.SignInAsync 和 CookieAuthenticationOptions.ExpireTimeSpan 的时间之间的差异
- node.js - 在开发者工具中隐藏文件?
- sql-server - Go 无法从 MS SQL 2014 读取最后一个 nvarchar(max) 值
- oracle - 尝试创建函数时出现编译错误
- android - 如何通过列表