redis - StepVerifier Redis 无限流
问题描述
我正在使用 Redis Streams 创建一个无限流,然后我创建了一个方法来侦听流上的新项目。我想测试此方法以确保在连接到 redis 流后,我将在流中获得新项目。我正在为 redis 使用 spring-webflux、junit5 和测试容器。这是我做的测试:
@Autowired
private ClassUnderTest template;
@SpyBean
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;
@Test
void streamFromLatest() {
final ConfigurationRedis configRedis = ...;
StepVerifier.create(template.streamFromLatest(streamKey))
.expectNextCount(0)
.then(
() ->
reactiveStringRedisTemplate
.opsForStream()
.add(StreamRecords.objectBacked(configRedis).withStreamKey(streamKey))
.block())
.expectNext(configRedis)
.thenCancel()
.verify(Duration.ofSeconds(20));
}
当我在 Eclipse 中运行测试时有时会通过,有时会失败,但是当我运行 maven 时总是失败。
这是正在测试的方法:
private final StreamReceiver<String, MapRecord<String, String, String>> streamReceiver;
public Flux<ConfigurationRedis> streamFromLatest(@NotNull final String streamKey) {
return streamReceiver
.receive(StreamOffset.latest(streamKey))
.doOnNext(value -> log.debug("New value on stream {}", value))
.map(map -> mapper.map(map.getValue(), ConfigurationRedis.class))
.doOnCancel(() -> log.debug("One of the consumers cancel"));
}
StepVerifier 有什么方法可以在订阅该streamFromLatest
方法后始终将项目添加到流中?