spring - Webflux 控制器'返回对象而不是 Mono'
问题描述
您好,我是 Webflux 的新手,我遵循构建反应式微服务的教程。在我的项目中,我遇到了以下问题。
我想为产品服务创建一个 crud api,下面是 Create 方法
@Override
public Product createProduct(Product product) {
Optional<ProductEntity> productEntity = Optional.ofNullable(repository.findByProductId(product.getProductId()).block());
productEntity.ifPresent((prod -> {
throw new InvalidInputException("Duplicate key, Product Id: " + product.getProductId());
}));
ProductEntity entity = mapper.apiToEntity(product);
Mono<Product> newProduct = repository.save(entity)
.log()
.map(mapper::entityToApi);
return newProduct.block();
}
问题是,当我从邮递员调用此方法时,我收到错误 “block()/blockFirst()/blockLast() 正在阻塞,线程 reactor-http-nio-3 不支持”但是当我使用 StreamListener这个电话工作正常。流侦听器从 rabbit-mq 通道获取事件
流监听器
@EnableBinding(Sink.class)
public class MessageProcessor {
private final ProductService productService;
public MessageProcessor(ProductService productService) {
this.productService = productService;
}
@StreamListener(target = Sink.INPUT)
public void process(Event<Integer, Product> event) {
switch (event.getEventType()) {
case CREATE:
Product product = event.getData();
LOG.info("Create product with ID: {}", product.getProductId());
productService.createProduct(product);
break;
default:
String errorMessage = "Incorrect event type: " + event.getEventType() + ", expected a CREATE or DELETE event";
LOG.warn(errorMessage);
throw new EventProcessingException(errorMessage);
}
}
}
我有两个问题。
- 为什么这适用于 StreamListener 而不是简单的请求?
- webflux 中是否有适当的方法来返回 Mono 的对象,或者我们总是必须返回 Mono?
解决方案
您的 create 方法希望看起来更像这样,并且您希望Mono<Product>
从控制器返回 a 而不是单独的对象。
public Mono<Product> createProduct(Product product) {
return repository.findByProductId(product.getProductId())
.switchIfEmpty(Mono.just(mapper.apiToEntity(product)))
.flatMap(repository::save)
.map(mapper::entityToApi);
}
正如@Thomas 评论的那样,您正在破坏反应式编码的一些基础知识,并且没有通过使用 block() 获得好处,应该更多地阅读它。例如,您正在使用的响应式 mongo 存储库将返回一个 Mono,如果它是空的,它有自己的处理方法,而无需使用 Optional,如上所示。
如果实体已经存在,则编辑以映射到错误,否则保存
public Mono<Product> createProduct(Product product) {
return repository.findByProductId(product.getProductId())
.hasElement()
.filter(exists -> exists)
.flatMap(exists -> Mono.error(new Exception("my exception")))
.then(Mono.just(mapper.apiToEntity(product)))
.flatMap(repository::save)
.map(mapper::entityToApi);
}