spring - 用反应堆开火即忘
问题描述
我的 Spring Boot 应用程序中有如下方法。
public Flux<Data> search(SearchRequest request) {
Flux<Data> result = searchService.search(request);//this returns Flux<Data>
Mono<List<Data>> listOfData = result.collectList();
// doThisAsync() // here I want to pass this list and run some processing on it
// the processing should happen async and the search method should return immediately.
return result;
}
//this method uses the complete List<Data> returned by above method
public void doThisAsync(List<Data> data) {
//do some processing here
}
目前,我正在使用@Async
带注释的服务类doThisAsync
,但不知道如何通过List<Data>
,因为我不想调用block
. 我只有Mono<List<Data>>
.
我的主要问题是如何单独处理这个 Mono 并且该search
方法应该返回Flux<Data>
.
解决方案
1,如果您的即发即弃已经异步返回Mono
/Flux
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> doThisAsync(data).subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public Mono<Void> doThisAsync(List<Data> data) {
//do some async/non-blocking processing here like calling WebClient
}
2,如果您的即发即弃确实阻塞了 I/O
public Flux<Data> search(SearchRequest request)
{
return searchService.search(request)
.collectList()
.doOnNext(data -> Mono.fromRunnable(() -> doThisAsync(data))
.subscribeOn(Schedulers.elastic()) // delegate to proper thread to not block main flow
.subscribe()) // add error logging here or inside doThisAsync
.flatMapMany(Flux::fromIterable);
}
public void doThisAsync(List<Data> data) {
//do some blocking I/O on calling thread
}
请注意,在上述两种情况下,您都会失去背压支持。如果由于doAsyncThis
某种原因速度变慢,那么数据生产者不会关心并继续生产项目。这是火与火机制的自然结果。
推荐阅读
- python - 如何在python中按字母顺序排序而不重复字母?
- ruby-on-rails - Ruby on rails 应用程序不发送电子邮件
- ios - 如何在不触摸或点击文本字段的情况下从动态创建的 UITextField 获取文本(禁用用户交互)?
- excel - 基于 Excel 电子表格中的输入的订单表
- git - CircleCI 子模块存储库访问被拒绝。部署密钥未与请求的存储库关联
- html - 固定div下的div
- css - 未在 https 中加载的 font-awesome 会在 Web 浏览器控制台中出现错误,但不会在 http 中出现错误?怎么修
- c# - 如何将这些 Fortran 语句转换为 C#?
- java - IntelliJ - 带参数的 Java Gradle 运行/调试
- ios - UserDefaults 自动登录不起作用