首页 > 解决方案 > 通过 Flux.create / Flux.switchMap 的可变间隔

问题描述

我需要一个基于用户输入可变的计时器。这是最小的例子:

Flux.<Integer> create(e -> {
   log.info("create"); // Never gets triggered
   e.next(2); // Sample user input: change to 2 second interval
})
   .switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
   .startWith(Flux.interval(Duration.ofSeconds(1)))
   .subscribe(e -> log.info("subscribe: {}", e)); // This works

在上面:

上面的switchMap部分在下面工作,即我看到它每秒记录“订阅:N”,但“创建”不会被记录,e.next(2)也不会被调用。

为什么这不起作用?这个用例有更好的解决方案吗?

标签: javaproject-reactor

解决方案


如 JavaDoc 中所述,Flux#startWith预先设置给定的序列。

由于您作为参数传递Flux.interval(Duration.ofSeconds(1)),它将每秒无限发出 long,并且您Flux.create的基于发布者将永远不会被订阅。

但是,如果您将其更改为:

.startWith(Mono.delay(Duration.ofSeconds(1)))

您也可以考虑将代码更改为:

Flux.<Integer> create(e -> {
   log.info("create");
   e.next(2);
})
   .startWith(1)
   .switchMap(v -> Flux.interval(Duration.ofSeconds(v)))
   .subscribe(e -> log.info("subscribe: {}", e));

在这里,我们startWithFlux.create块之后使用,并让switchMap处理它作为任何其他信号。

另外,请注意switchMap(v -> Flux.interval(Duration.ofSeconds(v)))读作:
“每 N 秒开始发射,其中 N 是最新发射的值”

如果您只需要“延迟”一次,也请考虑Mono.delay在此处使用。


推荐阅读