project-reactor - flatMap vs map,基本解释没问题,但是当我的转换函数本身不同步时会发生什么?
问题描述
我喜欢整个网络上对 reactor 中复杂概念的基本解释,它们在生产代码中并不是特别有用,所以下面我编写的一段代码使用 reactor kafka + spring boot 向 kafka 发送消息:
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;
import java.util.Properties;
public class CallbackSender {
private ObjectMapper objectMapper;
private String topic;
private static final Logger log = LoggerFactory.getLogger(CallbackSender.class.getName());
private final KafkaSender<String, String> sender;
public CallbackSender(ObjectMapper objectMapper, Properties senderProps, String topic) {
this.sender = KafkaSender.create(SenderOptions.create(senderProps));
this.objectMapper = objectMapper;
this.topic = topic;
}
public Mono<SenderResult<String>> sendMessage(ProcessContext<? extends AbstractMessage> processContext) throws JsonProcessingException {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic,
objectMapper.writeValueAsString(processContext.getMessage()));
SenderRecord<String, String, String> senderRecord = SenderRecord.create(producerRecord, processContext.getId());
return sender.send(Flux.just(senderRecord))
.doOnError(e -> log.error("Send failed", e))
.last();
}
}
我无法理解的是,调用this.sendMessage
as.map
与.flatMap
从外部管道调用之间到底有什么区别,那么如果我的同步函数没有真正做任何同步操作,那么如何解释映射应用同步转换到发射的元素基本字段获取?
这里 Kafka sender 已经是响应式和异步的,所以我使用哪一个都没关系?这是正确的假设吗?
我的代码不是惯用的吗?
或者对于这个特殊情况,它只是我在里面所做的一切的安全包装,.sendMessage
以防.flatMap
将来有人会添加同步代码,即糖安全语法。
我的理解是,.map
在这种情况下将简单地准备返回 Mono 的管道,而外部调用管道的订阅者将触发整个多米诺骨牌效应,对吗?
解决方案
我无法理解的是,从外部管道将 this.sendMessage 称为 .map 与 .flatMap 之间到底有什么区别
map()
应用一个同步函数(即一个没有订阅或回调的“就地”函数)并按原样返回结果。flatMap()
应用异步转换器函数,并在完成后解开发布者。所以:
我的理解是,在这种情况下,.map 只会准备返回 Mono 的管道,而外部调用管道的订阅者将触发整个多米诺骨牌效应,对吗?
是的,这是正确的(如果“多米诺骨牌效应”是指返回的单声道将被订阅并返回其结果。)
那么,如果我的同步函数除了基本字段获取之外并没有真正做任何同步的事情,那么映射应用同步转换到发射元素的解释呢?
很简单,因为这就是你告诉它要做的事情。设置发布者本质上没有什么异步的,只是在订阅后执行(调用不会发生这种情况map()
。)
推荐阅读
- kubernetes - 如何使用 Kustomize 并创建一个环境,例如:“http://${namePrefix}service-a/some-path”或“jdbc:db2://${namePrefix}service-b:${dbPort}/${数据库名称}"
- emacs - 打开议程时组织模式挂起
- javascript - 这个算法更优化的解决方案是什么?我觉得我可以从这个问题中学到更多
- python - 3D numpy 数组上的 PCA 实现
- python - 如何根据pytorch中的另一个张量选择索引
- java - 找到最小切割数
- c# - 选择查询 linq to sql 函数
- python - 创建和使用列识别客户 Pyspark
- amazon-web-services - 限制 setup.py 中定义的依赖项的安装
- google-cloud-source-repos - 创建新存储库时出错 (PERMISSION_DENIED)