首页 > 解决方案 > How to customize subscribe() behaviour with a Sink in Project Reactor?

问题描述

For a Flux you can specify a custom action that happens upon subscription. For instance Flux.create(emitter -> someApi.setCallback(emitter::next)) will set some API hooks upon subscription.

How can we have a custom subscription action like this for a sink? E.g. Sinks.unsafe().many().unicast().onBackpressureBuffer(someAction)?


I've managed to make it work using Flux.concat(Mono.fromRunnable(someAction), sink) but I imagine that adds unnecessary overhead so not ideal.

标签: javareactive-programmingproject-reactor

解决方案


您可以使用doOnSubscribe运算符:

sink.asFlux().doOnSubscribe(someAction)

推荐阅读