首页 > 解决方案 > flatMap vs map,基本解释没问题,但是当我的转换函数本身不同步时会发生什么?

问题描述

我喜欢整个网络上对 reactor 中复杂概念的基本解释,它们在生产代码中并不是特别有用,所以下面我编写的一段代码使用 reactor kafka + spring boot 向 kafka 发送消息:

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.sender.KafkaSender;
import reactor.kafka.sender.SenderOptions;
import reactor.kafka.sender.SenderRecord;
import reactor.kafka.sender.SenderResult;

import java.util.Properties;

public class CallbackSender {

    private ObjectMapper objectMapper;
    private String topic;

    private static final Logger log = LoggerFactory.getLogger(CallbackSender.class.getName());

    private final KafkaSender<String, String> sender;

    public CallbackSender(ObjectMapper objectMapper, Properties senderProps, String topic) {
        this.sender = KafkaSender.create(SenderOptions.create(senderProps));
        this.objectMapper = objectMapper;
        this.topic = topic;
    }

    public Mono<SenderResult<String>> sendMessage(ProcessContext<? extends AbstractMessage> processContext) throws JsonProcessingException {
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic,
                                                                             objectMapper.writeValueAsString(processContext.getMessage()));

        SenderRecord<String, String, String> senderRecord = SenderRecord.create(producerRecord, processContext.getId());

        return sender.send(Flux.just(senderRecord))
                     .doOnError(e -> log.error("Send failed", e))
                     .last();

    }

}

我无法理解的是,调用this.sendMessageas.map.flatMap从外部管道调用之间到底有什么区别,那么如果我的同步函数没有真正做任何同步操作,那么如何解释映射应用同步转换到发射的元素基本字段获取?

这里 Kafka sender 已经是响应式和异步的,所以我使用哪一个都没关系?这是正确的假设吗?

我的代码不是惯用的吗?

或者对于这个特殊情况,它只是我在里面所做的一切的安全包装,.sendMessage以防.flatMap将来有人会添加同步代码,即糖安全语法。

我的理解是,.map在这种情况下将简单地准备返回 Mono 的管道,而外部调用管道的订阅者将触发整个多米诺骨牌效应,对吗?

标签: project-reactor

解决方案


我无法理解的是,从外部管道将 this.sendMessage 称为 .map 与 .flatMap 之间到底有什么区别

map()应用一个同步函数(即一个没有订阅或回调的“就地”函数)并按原样返回结果。flatMap()应用异步转换器函数,并在完成后解开发布者。所以:

我的理解是,在这种情况下,.map 只会准备返回 Mono 的管道,而外部调用管道的订阅者将触发整个多米诺骨牌效应,对吗?

是的,这是正确的(如果“多米诺骨牌效应”是指返回的单声道将被订阅并返回其结果。)

那么,如果我的同步函数除了基本字段获取之外并没有真正做任何同步的事情,那么映射应用同步转换到发射元素的解释呢?

很简单,因为这就是你告诉它要做的事情。设置发布者本质上没有什么异步的,只是在订阅后执行(调用不会发生这种情况map()。)


推荐阅读