首页 > 解决方案 > StepVerifier Redis 无限流

问题描述

我正在使用 Redis Streams 创建一个无限流,然后我创建了一个方法来侦听流上的新项目。我想测试此方法以确保在连接到 redis 流后,我将在流中获得新项目。我正在为 redis 使用 spring-webflux、junit5 和测试容器。这是我做的测试:

@Autowired 
private ClassUnderTest template;

@SpyBean 
private ReactiveStringRedisTemplate reactiveStringRedisTemplate;

@Test
void streamFromLatest() {

    final ConfigurationRedis configRedis = ...;

    StepVerifier.create(template.streamFromLatest(streamKey))
        .expectNextCount(0)
        .then(
            () ->
                reactiveStringRedisTemplate
                    .opsForStream()
                    .add(StreamRecords.objectBacked(configRedis).withStreamKey(streamKey))
                    .block())
        .expectNext(configRedis)
        .thenCancel()
        .verify(Duration.ofSeconds(20));
}

当我在 Eclipse 中运行测试时有时会通过,有时会失败,但是当我运行 maven 时总是失败。

这是正在测试的方法:

private final StreamReceiver<String, MapRecord<String, String, String>> streamReceiver;

public Flux<ConfigurationRedis> streamFromLatest(@NotNull final String streamKey) {

    return streamReceiver
        .receive(StreamOffset.latest(streamKey))
        .doOnNext(value -> log.debug("New value on stream {}", value))
        .map(map -> mapper.map(map.getValue(), ConfigurationRedis.class))
        .doOnCancel(() -> log.debug("One of the consumers cancel"));
}

StepVerifier 有什么方法可以在订阅该streamFromLatest方法后始终将项目添加到流中?

标签: redisspring-webfluxjunit5spring-testtestcontainers

解决方案


推荐阅读