mongodb - 使用 Reactive Mongo 和 Web 客户端的非阻塞功能方法
问题描述
ReactiveMongoRepository
我有一个使用接口从数据库中读取对象的微服务。
目标是获取这些对象中的每一个并将其推送到 AWS Lambda 函数(在将其转换为 DTO 之后)。如果该 lambda 函数的结果在 200 范围内,则将该对象标记为成功,否则忽略。
在过去简单的 Mongo 存储库和 RestTemplate 中,这将是一项微不足道的任务。但是,我正在尝试了解此 Reactive 交易,并避免阻塞。
这是我想出的代码,我知道我在阻止webClient
,但我该如何避免呢?
@Override
public Flux<Video> index() {
return videoRepository.findAllByIndexedIsFalse().flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
// Blocking call
final HttpStatus httpStatus = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.block()
.statusCode();
if (httpStatus.is2xxSuccessful()) {
video.setIndexed(true);
}
return videoRepository.save(video);
});
}
我从计划任务中调用上述内容,我并不真正关心 index() 方法的实际结果,只关心期间发生的事情。
@Scheduled(fixedDelay = 60000)
public void indexTask() {
indexService
.index()
.log()
.subscribe();
}
我已经阅读了很多关于该主题的博客文章等,但它们都只是简单的 CRUD 操作,中间没有发生任何事情,所以并没有真正让我全面了解如何实现这些东西。
有什么帮助吗?
解决方案
您的解决方案实际上非常接近。在这些情况下,您应该尝试逐步分解反应链,并且为了清晰起见,毫不犹豫地将位转换为独立的方法。
@Override
public Flux<Video> index() {
Flux<Video> unindexedVideos = videoRepository.findAllByIndexedIsFalse();
return unindexedVideos.flatMap(video -> {
final SearchDTO searchDTO = SearchDTO.builder()
.name(video.getName())
.canonicalPath(video.getCanonicalPath())
.objectID(video.getObjectID())
.userId(video.getUserId())
.build();
Mono<ClientResponse> indexedResponse = webClient.post()
.uri(URI.create(LAMBDA_ENDPOINT))
.body(BodyInserters.fromObject(searchDTO)).exchange()
.filter(res -> res.statusCode().is2xxSuccessful());
return indexedResponse.flatMap(response -> {
video.setIndexed(true);
return videoRepository.save(video);
});
});
推荐阅读
- java - 反应堆和数据库事务
- java - 如何转换 C++ 数组
到 JNIWrapper 中的 jfloatArray? - python - 如何对 Pandas 中的所有行进行排序
- firebase - 没有身份验证的 Firestore 安全规则
- python - 如何使用 saxonc.PySaxonProcessor 获取 XML 元素的属性值
- c++ - cout 不会输出到命令提示符,但会重定向到文本文件
- swift - UI 测试在 Xcode 11 中失败,但过去在 Xcode 10 中通过
- html - 我的标题按钮是错误的方式,但在代码中是正确的方式?
- algorithm - 找到所有边的最小最高成本的算法是什么?
- apache-flink - Flink 1.8.2 状态进化抛出异常