首页 > 解决方案 > 为什么 WebFlux 在生成时不会立即返回值

问题描述

据我了解,当 Spring WebFlux 应该返回值的 Flux 时,它会在生成值时立即打印和发送值。但是我已经做了一些测试,只有当所有值都产生时它才会返回所有值(示例代码如下)。我做错了什么,或者它应该如何表现并且我误解了反应性概念?

服务器部分

@SpringBootApplication
public class ReactiveResponseExperimentApplication {

    public static void main(String[] args) {
        SpringApplication.run(ReactiveResponseExperimentApplication.class, args);
    }

}


@RestController
@RequiredArgsConstructor
class ProductController {

    private final BobikService bobikService;

    @CrossOrigin
    @PostMapping("/bobiks")
    @ResponseStatus(HttpStatus.CREATED)
    public Mono<Bobik> createBobik(@RequestBody Bobik bobik) {
        return bobikService.save(bobik);
    }

    @CrossOrigin
    @GetMapping("/bobiks")
    public Flux<Bobik> getAllBobiks() {
        return bobikService.getAllBobiks();
    }

    @CrossOrigin
    @GetMapping("/bobiks/{id}")
    public Flux<Bobik> getBobikById(@PathVariable UUID id) {
        return bobikService.getBobikById(id);
    }
}

@Component
class BobikService {

    private HashMap<UUID, Bobik> dao = new HashMap<>();

    public Mono<Bobik> save(Bobik bobik) {
        bobik.setId(randomUUID());
        bobik.setStatus(Status.CREATED);
        dao.put(bobik.getId(), bobik);
        return Mono.just(bobik);
    }

    public Flux<Bobik> getAllBobiks() {
        return Flux.fromIterable(dao.values());
    }

    public Flux<Bobik> getBobikById(UUID id) {
        Bobik bobik = dao.get(id);
        if (bobik != null) {
            return Flux.create(fluxSink -> {
                boolean sendingInProgress = true;
                while(sendingInProgress) {
                    try {
                        Thread.sleep(4000);
                        if (bobik.getStatus() == Status.OPTIMISED) {
                            fluxSink.complete();
                            sendingInProgress = false;
                            System.out.println("---> sending completed");
                        } else {
                            bobik.setStatus(Status.values()[bobik.getStatus().ordinal() + 1]);
                            System.out.println("---> send Bobik with status: " + bobik.getStatus());
                            fluxSink.next(bobik);
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                        fluxSink.error(e);
                    }
                }
            });
        }
        return Flux.empty();
    }
}

@Data
class Bobik {

    private UUID id;
    private String name;
    private Status status;

}

enum Status {

    CREATED,
    UPLOADED,
    CROPPED,
    OPTIMISED,

}

客户部分

    reactBobik() {
        console.debug(`--- going to react with bobik ${this.bobikId}`);
        this.http
            .get<Bobik>(`http://localhost:8080/bobiks/${this.bobikId}`)
            .subscribe(
                (nextBobik) => {
                    console.info(`--- next bobik is here with status ${nextBobik.status}`);
                    console.info(nextBobik);
                },
                (error) => {
                    console.error(`error occurred during fetching bobiks`, error);
                },
                () => {
                    console.info(`--- that's all, folks!`)
                }
            );
    }

服务器日志

---> send Bobik with status: UPLOADED
---> send Bobik with status: CROPPED
---> send Bobik with status: OPTIMISED
---> sending completed

客户端日志

--- going to react with bobik 079b41eb-ebf3-4d7a-a520-1b9b4bfdae4c
--- next bobik is here with status undefined
Array(3) [ {…}, {…}, {…} ]
--- that's all, folks!

标签: springobservablereactive-programmingspring-webflux

解决方案


推荐阅读