java - 如何将助焊剂链接到另一个助焊剂/单声道并施加另一个背压?
问题描述
我在反应堆核心中有以下使用通量的反应代码:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> redisHashReactiveCommands.hmset(key, map))
//.flatMap(... //want to store same data async into kafka with its own back pressure handling)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s))
.doOnComplete(() -> log.debug("On completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
如您所见,我对此流程的外部源(FluxSink.OverflowStrategy.LATEST)进行了背压处理。但是,我还想为我的进程配置背压到 redis (redisHashReactiveCommands.hmset(key, map)),因为它可能是比我的进程的外部源更大的瓶颈。我希望我需要为 redis 部分创建另一个通量并将其与该通量链接,但是由于 .flatMap 适用于单个项目而不是项目流,我该如何实现这一点?
另外,我也想将相同的发射项目存储到 Kafka 中,但是链接翻盖图似乎不起作用.. 有没有一种简单的方法可以在一组函数调用中将所有这些链接在一起(外部源 -> 我的流程,我的进程-> redis,我的进程-> kafka)?
解决方案
如果您对主序列中的结果对象不感兴趣,您可以在flatMap
. 您必须移动 subscribeOn 并在 flatMap 中登录,并将它们放在内部保存发布者上:
Flux.create(sink -> ... /* listens to and receives from external source */ , FluxSink.OverflowStrategy.LATEST)
.flatMap(map -> Mono.when(
redisHashReactiveCommands.hmset(key, map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Redis consumed. Result -> {}", s)),
kafkaReactiveCommand.something(map)
.subscribeOn(Schedulers.parallel())
.doOnNext(s -> log.debug("Kafka consumed. Result -> {}", s)),
))
//... this results in a Mono<Void>
.doOnComplete(() -> log.debug("Both redis and kafka completed."))
.doOnError(exception -> log.error("Error occurred while consuming message", exception))
.subscribe();
或者,如果您确定两个进程都发出结果元素或错误,则可以Tuple2
通过替换将两个结果when
合并为 a zip
。
推荐阅读
- publish - 在 Solace 主题上发布 JMS 消息
- react-native - React Native 中的自定义标签栏
- apache - 如何使用 Apache Lucenes Indexer 搜索来搜索日语单词?
- css - 如何创建如图所示的响应式网格设计?
- jira - 一个项目 JIRA Ticket 中的更改如何反映另一个项目中的链接工单
- sql - 在红移中将日期转换为 DD-MMM-YY
- user-interface - Octave GUI 开发
- windows - 将 wxWidgets 与 MinGW、CMake 和 Code::Blocks 一起使用
- c# - 在 asp.net 中获取客户端 Windows 用户 ID,用于 LDAP 单点登录
- node.js - 多个 RESTful API 服务的 NodeJS 身份验证