unit-testing - 连接到 ElasticSearch 单元测试的 Spring WebFlux 项目
问题描述
我目前正在开发一个连接到 ElasticSearch 的 Spring WebFlux 项目。我有一个 Rest Service,它依次调用连接到 ES 的服务层中的方法。我在为我的服务层编写单元测试时遇到问题。任何帮助将不胜感激,因为这是我第一次使用响应式编程。下面是我的 Controller 和 Service 方法的代码片段。
控制器代码:
@GetMapping(path = "/api/apis/services/{id}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Flux<ClassA> serviceApis(@PathVariable final String serviceKey) {
return apiService.getDataForService(serviceKey);
}
服务层:
@PreAuthorize("isFullyAuthenticated()")
public Flux<ClassA> getDataForService(
final String id) {
IdentityToken token = GSLSecurityContext.getCurrentUserIdentityToken();
if (token == null) {
return Flux.error(new Exception("No token found"));
}
String securityQueryJson = getSecurityShould(token);
String queryToRun = QUERY
.replace("XXX_SIZE_XXX", config.getValueAsString("scroll.size"))
.replace("XXX_SECURITY_SHOULD_XXX", securityQueryJson)
.replace("XXX_SERVICE_KEY_XXX", id);
WebClient client = ClientUtil.getDataLakeWebClient(config);
Flux<ClassA> response = getData(client, queryToRun);
return response;
}
getData 代码如下:
protected Flux<ClassA> getData(
final WebClient client,
final String queryToRun) {
String scrollTimeoutQuery = "?scroll=" + config.getValueAsString("scroll.timeout");
long timeout = config.getValueAsLong("query.timout");
return Flux.generate(
() -> null,
(scrollId, sink) -> {
ClassAWrapper lastWrapper = null;
if (scrollId == null) {
Mono<ClassAWrapper> wrapper = client.post()
.uri(getSearchURI() + scrollTimeoutQuery)
.body(BodyInserters.fromObject(queryToRun)).retrieve()
.bodyToMono(ClassAWrapper.class)
.onErrorMap(original -> new Exception("Unable to retrieve from elastic search for query " + queryToRun, original))
.log();
try {
lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
} catch (IllegalStateException ex) {
LOG.error("Timedout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
lastWrapper = null;
} catch (Exception ex) {
LOG.error("Error in getting message details",ex);
lastWrapper = null;
}
} else {
String scrollQuery = "{\"scroll\" : \"" + config.getValueAsString("scroll.timeout") + "\", \"scroll_id\" : \"" + scrollId + "\"}";
Mono<ClassAWrapper> wrapper = client.post()
.uri("_search/scroll")
.body(BodyInserters.fromObject(scrollQuery)).retrieve()
.bodyToMono(ClassAWrapper.class)
.onErrorMap(original -> new Exception("Unable to retrieve next page of data from elastic search", original))
.log();
try {
lastWrapper = wrapper.block(Duration.ofSeconds(timeout));
} catch (IllegalStateException ex) {
LOG.error("Timeout after " + timeout + " seconds while getting data from elastic search for query " + queryToRun);
lastWrapper = null;
} catch (Exception ex) {
LOG.error("Error in getting message details",ex);
lastWrapper = null;
}
}
if (lastWrapper == null || lastWrapper.getResult() == null || lastWrapper.getResult().getDetails().isEmpty()) {
sink.next(new ClassA());
sink.complete();
return null;
}
sink.next(lastWrapper.getResult());
return lastWrapper.getScrollId();
}
);
}
这里,queryToRun 是要执行的 ES 查询。config 是配置。我需要测试方法“getDataForService()”。
解决方案
推荐阅读
- arrays - 将 hive 数组列转换为 map 列
- java - 如何将java数据放入json文件?
- docker - docker run -v 'pwd':/home ,似乎不起作用
- java - VerticalBisector 方法的结果
- java - 如何仅过滤使用我的自定义注释进行注释的 api
- node.js - 如何为 Hyperledger-fabric 开发环境安装 node js?
- apache - 仅当内容嵌入另一个站点时,如何允许访问文件夹?
- android - 如何使用 bundletool 从 AAB 生成 APK(不是 AKS)?
- ruby-on-rails - 为什么我会收到 ActiveRecord::ConnectionTimeoutError?
- ios - 找不到 `ActionCableClient` 的 podspec。Pod 安装问题