首页 > 解决方案 > 消耗每个元素后向 Flux 添加副作用?

问题描述

这是一个简化的例子,但它说明了这一点。

假设我有一个这样定义的方法:

Flux<String> generateFlux() {
  return Flux.just("hello", "world"); // (S)
}

有没有办法在一个元素被消耗S打印一些东西,而通量的订阅者不必做或知道任何事情?

例如,我要修改为S

generateFlux().doOnNext(System.out::println).block()

实际上将其打印到控制台:

hello
consumed
world
consumed

是否可以使用 Reactor 3.3 来做到这一点,如果可以的话怎么做?

更新

我想我的问题不清楚,所以我会添加更多细节。我的问题的要点是我希望它保持不变

generateFlux().doOnNext(System.out::println).block() // A

所以我想修改S,即Flux.just("hello", "world")适应我的要求,而不是A. 我知道你可以添加subscribedoOnNextA但这不是我要问的。我在问是否有办法修改S,以便在没有对 进行任何更改的情况下,当订阅者/消费者处理完所有项目A时,这将被打印出来:A

hello
consumed
world
consumed

即以S某种方式接收到第一个元素已被消耗的信号(“hello”),然后它可以执行一个doOnNext(或其他),在这种情况下打印“consumer”。当第二个元素被消耗(“世界”)时,它应该再次打印“消耗”。

这可能吗?

标签: javaspringreactive-programmingproject-reactor

解决方案


您可以使用 doOnNext 回调来拦截发布者推送下一个元素的动作。但使用订阅而不是阻止

generateFlux()
.doOnNext(next => System.out.println("Next Element pushed by publisher" + next))
.subscribe();

您可以使用覆盖 onNext、onError 和 onCompleted 的所有行为的订阅签名。subscribe 方法在 Flux 中被重载


推荐阅读