首页 > 解决方案 > 使用 Reactive Mongo 和 Web 客户端的非阻塞功能方法

问题描述

ReactiveMongoRepository我有一个使用接口从数据库中读取对象的微服务。

目标是获取这些对象中的每一个并将其推送到 AWS Lambda 函数(在将其转换为 DTO 之后)。如果该 lambda 函数的结果在 200 范围内,则将该对象标记为成功,否则忽略。

在过去简单的 Mongo 存储库和 RestTemplate 中,这将是一项微不足道的任务。但是,我正在尝试了解此 Reactive 交易,并避免阻塞。

这是我想出的代码,我知道我在阻止webClient,但我该如何避免呢?

@Override
public Flux<Video> index() {
    return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        // Blocking call
        final HttpStatus httpStatus = webClient.post()
                .uri(URI.create(LAMBDA_ENDPOINT))
                .body(BodyInserters.fromObject(searchDTO)).exchange()
                .block()
                .statusCode();

        if (httpStatus.is2xxSuccessful()) {
            video.setIndexed(true);
        }

        return videoRepository.save(video);
    });
}

我从计划任务中调用上述内容,我并不真正关心 index() 方法的实际结果,只关心期间发生的事情。

@Scheduled(fixedDelay = 60000)
public void indexTask() {
    indexService
            .index()
            .log()
            .subscribe();
}

我已经阅读了很多关于该主题的博客文章等,但它们都只是简单的 CRUD 操作,中间没有发生任何事情,所以并没有真正让我全面了解如何实现这些东西。

有什么帮助吗?

标签: mongodbspring-data-mongodbspring-webflux

解决方案


您的解决方案实际上非常接近。在这些情况下,您应该尝试逐步分解反应链,并且为了清晰起见,毫不犹豫地将位转换为独立的方法。

@Override
public Flux<Video> index() {

    Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
    return unindexedVideos.flatMap(video -> {
        final SearchDTO searchDTO = SearchDTO.builder()
                .name(video.getName())
                .canonicalPath(video.getCanonicalPath())
                .objectID(video.getObjectID())
                .userId(video.getUserId())
                .build();

        Mono<ClientResponse> indexedResponse = webClient.post()
            .uri(URI.create(LAMBDA_ENDPOINT))
            .body(BodyInserters.fromObject(searchDTO)).exchange()
            .filter(res -> res.statusCode().is2xxSuccessful());

        return indexedResponse.flatMap(response -> {
            video.setIndexed(true);
            return videoRepository.save(video);
        });
    });

推荐阅读