java - 如何确保具有无限流的 Flux 在给定时间内完成?
问题描述
我有一个Flux
生成器(来自项目反应器)用于延迟 1 秒以发射每个元素的流。而且由于流是无限的,我还决定使用该take()
方法要采用多少元素。我想测试这样的方法是否可以在给定的时间内执行。我试图创建受此答案启发的解决方案,但没有成功。
这是Flux
方法
import reactor.core.publisher.Flux;
import java.time.Duration;
@Slf4j
public class FluxAndMonoStreams {
public Flux<Double> createFluxDoubleWithDelay(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec))
.map(l -> l.doubleValue())
.take(numberOfElements)
.log();
}
}
这是单元测试。在. _ .thenConsumeWhile(value -> true)
_ .expectNextCount(4)
_ 但它们都不起作用。.expectNext(0.0, 1.0, 2.0, 3.0)
.thenAwait(Duration.ofSeconds(4))
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
public class FluxAndMonoStreamsTest {
FluxAndMonoStreams myFluxAndMonoStreams = new FluxAndMonoStreams();
@Test
void testCreateFluxStreamWithMapAndVerifyDelay() {
Flux<Double> streamLongFlux = myFluxAndMonoStreams
.createFluxDoubleWithDelay(4, 1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
// .expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
}
我收到这个错误,说我没有使用值1.0
,2.0
...基本上我想确保我只0.0, 1.0, 2.0, 3.0
在 4 秒内使用值。我怎么做?
12:37:31.853 [Test worker] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:37:31.912 [test-1] INFO reactor.Flux.Take.1 - onSubscribe(FluxTake.TakeSubscriber)
12:37:31.915 [test-1] INFO reactor.Flux.Take.1 - request(unbounded)
12:37:32.917 [parallel-1] INFO reactor.Flux.Take.1 - onNext(0.0)
12:37:32.921 [parallel-1] INFO reactor.Flux.Take.1 - cancel()
expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
java.lang.AssertionError: expectation "expectComplete" failed (expected: onComplete(); actual: onNext(0.0))
附加说明:只是为了再澄清一次我想要的行为。如果我取消注释 .expectNext(0.0, 1.0, 2.0, 3.0)
并将 4 seconds 替换为 2 seconds .thenAwait(Duration.ofSeconds(2))
,则测试应该失败。但事实并非如此。
解决方案
如果我取消注释expectNext,这个测试对我来说运行“很好”。
public Flux<Double> create(long numberOfElements, long delaySec) {
return Flux.interval(Duration.ofSeconds(delaySec)).map(l -> l.doubleValue()).take(numberOfElements).log();
}
@Test
public void virtualTimeTest() {
Flux<Double> streamLongFlux = create(4,1);
Scheduler scheduler = Schedulers.newSingle("test");
AtomicInteger incrementer = new AtomicInteger();
StepVerifier
.withVirtualTime(() -> streamLongFlux
.subscribeOn(scheduler)
.doOnNext(value -> incrementer.incrementAndGet())
)
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
// .thenConsumeWhile(value -> true)
// .expectNextCount(4)
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete()
;
}
它不在虚拟时间中运行,因为在建立虚拟时间之前调用了 Flux.interval。
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.thenAwait(Duration.ofSeconds(4))
.expectNext(0.0, 1.0, 2.0, 3.0)
.verifyComplete();
这确实在虚拟时间内运行,需要 400 毫秒而不是 4.4 秒
另一个更明确的测试是:
@Test
public void virtualTime(){
StepVerifier.withVirtualTime(() -> create(4,1))
.expectSubscription()
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(0.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(1.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(2.0)
.expectNoEvent(Duration.ofSeconds(1))
.expectNext(3.0)
.verifyComplete();
}
推荐阅读
- python - 网络爬取:python 使用 -o file.json 将文件保存为 utf-8:输出显示像 \u00a9 这样的字符
- entity-framework - C# EF 不使用 ForEach 保存多条记录
- android - com.android.builder.dexing.DexArchiveBuilderException:dexing 时出错
- angular - 类型错误:e 不是函数 | 角 7
- python - LookupError:未注册编解码器搜索功能:找不到编码
- javascript - 我可以在不同的元素上添加相同的 document.createElement 元素吗?
- swift - 内存泄漏警报控制器 / UIImagePickerController
- c# - 使用 c# sslclient 的 Https 帖子
- javascript - 如何在 Twitter 上分享图像和详细信息
- sql - SQL Server 中的动态公式列