首页 > 解决方案 > 为 Object Flux 实现 Upsert 功能

问题描述

我有一个包含对象的通量请求。任务是创建(如果不存在)或更新(如果存在)这些对象。我已经尝试实现以下目标:

requestFlux.flatMap(entry -> {
        Mono<Optional<MyObject>> existingMapping = myRepository
            .findByIdAndName(entry.getId(), entry.getName());
        return existingMapping.map(optional -> {
        if (optional.isPresent()) {

            MyObject model = MyObjectMapper.updateMyObject(entry, optional.get()); 
            myRepository.save(model);
            return model;
        } else {

            MyObject model = MyObjectMapper.toMyObjectModel(entry);
            myRepository.save(model);
            return model;
        }
        });

    });

由于某种原因,这似乎不起作用。它不会引发任何错误,不会创建/更新记录或记录 if/else 中存在的消息。想知道我错过了什么吗?

标签: rx-javaspring-webfluxproject-reactornonblocking

解决方案


您必须在一次更改中绑定所有内容。不要打破链条。

public Mono<ServerResponse> createOrUpdateBulkMapping(ServerRequest serverRequest) {

    Flux<RequestObjects> requestFlux = serverRequest.bodyToFlux(RequestObjects.class);
    List<MyObjects> createdList = new ArrayList<>();
    List<MyObjects> updatedList = new ArrayList<>();

    return requestFlux
            .flatMap(entry -> {

                return myRepository.findByIdAndName(entry.getId(),entry.getName())
                       .flatMap(optional -> {
                          updatedList.add(entry);
                          MyObject model = MyObjectMapper.updateMyObject(entry, optional.get());
                          return myRepository.save(model);
                       })
                       .switchIfEmpty(saveNewModel(entry, createdList));

            })
            .then(ServerResponse.ok().body(Mono.just("Bulk Create/Update is successful").log(), String.class));
}

private Mono<MyObject> saveNewModel (RequestObjects entry, List<MyObjects> createdList) {
  createdList.add(entry);
  MyObject model = MyObjectMapper.toMyObjectModel(entry);
  return myRepository.save(model);
}

编辑:作为您在此处提供的要点:https ://gist.github.com/vkrmsngh43/13a8753848893a0f6988d8327f656220答案已更改


推荐阅读