java - 消耗每个元素后向 Flux 添加副作用?
问题描述
这是一个简化的例子,但它说明了这一点。
假设我有一个这样定义的方法:
Flux<String> generateFlux() {
return Flux.just("hello", "world"); // (S)
}
有没有办法在一个元素被消耗后S
打印一些东西,而通量的订阅者不必做或知道任何事情?
例如,我要修改为S
:
generateFlux().doOnNext(System.out::println).block()
实际上将其打印到控制台:
hello
consumed
world
consumed
是否可以使用 Reactor 3.3 来做到这一点,如果可以的话怎么做?
更新
我想我的问题不清楚,所以我会添加更多细节。我的问题的要点是我希望它保持不变:
generateFlux().doOnNext(System.out::println).block() // A
所以我想修改S
,即Flux.just("hello", "world")
适应我的要求,而不是A
. 我知道你可以添加subscribe
等doOnNext
,A
但这不是我要问的。我在问是否有办法修改S
,以便在没有对 进行任何更改的情况下,当订阅者/消费者处理完所有项目A
时,这将被打印出来:A
hello
consumed
world
consumed
即以S
某种方式接收到第一个元素已被消耗的信号(“hello”),然后它可以执行一个doOnNext
(或其他),在这种情况下打印“consumer”。当第二个元素被消耗(“世界”)时,它应该再次打印“消耗”。
这可能吗?
解决方案
您可以使用 doOnNext 回调来拦截发布者推送下一个元素的动作。但使用订阅而不是阻止
generateFlux()
.doOnNext(next => System.out.println("Next Element pushed by publisher" + next))
.subscribe();
您可以使用覆盖 onNext、onError 和 onCompleted 的所有行为的订阅签名。subscribe 方法在 Flux 中被重载
推荐阅读
- c++ - PortAudio 测试应用程序 - 未解析的外部符号 Pa_GetVersionInfo
- api - Microsoft Graph API MFA 状态报告
- ssl - 具有 TLS 发起 CERTIFICATE_VERIFY_FAILED 的 Istio Egress 网关
- vue.js - Vue Enterprise Boilerplate:sass-loader 无法从更新的 .scss 文件中加载
- angular - JWT Helper 服务在 Angular 10 中引发错误
- php - 如何让数据允许小数
- python - 如何并行写入numpy数组中的同一单元格?
- powershell - 如何在 PowerShell 的脚本块中运行模块?
- python - 合并两个动画条形图 Plotly
- java - 通过 HtmlUnit 下载 CSV 并读入数组