首页 > 解决方案 > 连接到 ElasticSearch 单元测试的 Spring WebFlux 项目

问题描述

我目前正在开发一个连接到 ElasticSearch 的 Spring WebFlux 项目。我有一个 Rest Service,它依次调用连接到 ES 的服务层中的方法。我在为我的服务层编写单元测试时遇到问题。任何帮助将不胜感激,因为这是我第一次使用响应式编程。下面是我的 Controller 和 Service 方法的代码片段。

控制器代码:

@GetMapping(path = "/api/apis/services/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
  Flux<ClassA> serviceApis(@PathVariable final String serviceKey) {
        return apiService.getDataForService(serviceKey);
  }

服务层:

@PreAuthorize("isFullyAuthenticated()")
    public Flux<ClassA> getDataForService(
            final String id) {

        IdentityToken token = GSLSecurityContext.getCurrentUserIdentityToken();
        if (token == null) {
            return Flux.error(new Exception("No token found"));
        }

        String securityQueryJson = getSecurityShould(token);

        String queryToRun = QUERY
                .replace("XXX_SIZE_XXX", config.getValueAsString("scroll.size"))
                .replace("XXX_SECURITY_SHOULD_XXX", securityQueryJson)
                .replace("XXX_SERVICE_KEY_XXX", id);

        WebClient client = ClientUtil.getDataLakeWebClient(config);
        Flux<ClassA> response = getData(client, queryToRun);
        return response;
    }

getData 代码如下:

protected Flux<ClassA> getData(
        final WebClient client,
        final String queryToRun) {
    String scrollTimeoutQuery = "?scroll=" + config.getValueAsString("scroll.timeout");
    long timeout = config.getValueAsLong("query.timout");
    return Flux.generate(
        () -> null,
        (scrollId, sink) -> {
            ClassAWrapper lastWrapper = null;
            if (scrollId == null) {
                Mono<ClassAWrapper> wrapper = client.post()
                    .uri(getSearchURI() + scrollTimeoutQuery)
                    .body(BodyInserters.fromObject(queryToRun)).retrieve()
                    .bodyToMono(ClassAWrapper.class)
                    .onErrorMap(original -> new Exception("Unable to retrieve from elastic search for query " + queryToRun, original))
                    .log();
                try {
                    lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
                } catch (IllegalStateException ex) {
                    LOG.error("Timedout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
                    lastWrapper = null;
                } catch (Exception ex) {
                    LOG.error("Error in getting message details",ex);
                    lastWrapper = null;
                } 
            } else {
                String scrollQuery = "{\"scroll\" : \"" + config.getValueAsString("scroll.timeout") + "\", \"scroll_id\" : \"" + scrollId + "\"}";
                Mono<ClassAWrapper> wrapper = client.post()
                        .uri("_search/scroll")
                        .body(BodyInserters.fromObject(scrollQuery)).retrieve()
                        .bodyToMono(ClassAWrapper.class)
                        .onErrorMap(original -> new Exception("Unable to retrieve next page of data from elastic search", original))
                        .log();
                try {
                    lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
                } catch (IllegalStateException ex) {
                    LOG.error("Timeout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
                    lastWrapper = null;
                } catch (Exception ex) {
                    LOG.error("Error in getting message details",ex);
                    lastWrapper = null;
                } 
            }
            if (lastWrapper == null || lastWrapper.getResult() == null || lastWrapper.getResult().getDetails().isEmpty()) {
                sink.next(new ClassA());
                sink.complete();
                return null;
            }
            sink.next(lastWrapper.getResult());
            return lastWrapper.getScrollId();
        }
    );
}

这里,queryToRun 是要执行的 ES 查询。config 是配置。我需要测试方法“getDataForService()”。

标签: unit-testingspring-webfluxreactive

解决方案


推荐阅读