java - 在 Spring Webflux 中执行阻塞 JDBC 调用
问题描述
我正在使用 Spring Webflux 和 Spring data jpa,使用 PostgreSql 作为后端数据库。我不想在进行 db 调用时阻塞主线程find
and save
。为了达到同样的目的,我在Controller
课堂上有一个主调度程序和一个jdbcScheduler
服务类。
我定义它们的方式是:
@Configuration
@EnableJpaAuditing
public class CommonConfig {
@Value("${spring.datasource.hikari.maximum-pool-size}")
int connectionPoolSize;
@Bean
public Scheduler scheduler() {
return Schedulers.parallel();
}
@Bean
public Scheduler jdbcScheduler() {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
}
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
}
现在,在我的服务层中进行获取/保存调用时,我会:
@Override
public Mono<Config> getConfigByKey(String key) {
return Mono.defer(
() -> Mono.justOrEmpty(configRepository.findByKey(key)))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) {
return Flux
.fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
@Override
public Flux<Config> addConfigs(List<Config> configList) {
return Flux.fromIterable(configRepository.saveAll(configList))
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
}
在控制器中,我这样做:
@PostMapping
@ResponseStatus(HttpStatus.CREATED)
Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) {
return configService.addConfigs(configs).collectList()
.map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
.subscribeOn(scheduler);
}
这个对吗?和/或有更好的方法吗?
我的理解是:
.subscribeOn(jdbcScheduler)
.publishOn(scheduler);
是该任务将在jdbcScheduler
线程上运行,稍后的结果将在我的主并行上发布scheduler
。这种理解正确吗?
解决方案
您对publishOn
and的理解是正确的subscribeOn
(请参阅 reactor 项目中有关这些操作员的参考文档)。
如果你调用阻塞库而不调度特定调度器的工作,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),并且你的应用程序将只能同时服务几个请求。
现在我不确定你这样做是为了达到什么目的。
首先,并行调度程序是为 CPU 密集型任务设计的,这意味着您将拥有很少的任务,与 CPU 内核一样多(或更多)。在这种情况下,这就像将您的线程池大小设置为常规 Servlet 容器上的内核数。您的应用将无法处理大量并发请求。
即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是在 Spring WebFlux 中本地调度请求处理的地方。
如果您的最终目标是性能和可扩展性,那么在响应式应用程序中封装阻塞调用可能比您的常规 Servlet 容器执行得更差。
您可以改为使用 Spring MVC 并且:
- 在处理阻塞库时使用通常的阻塞返回类型,比如 JPA
- 当您不依赖于此类库时使用
Mono
和返回类型Flux
这不会是非阻塞的,但这仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。
推荐阅读
- codenameone - Rest API 抛出 JSON 解析错误:JSON 解析期间出现异常,行:1 列:1 缓冲区
- excel - 功率枢轴中的 R 平方测量
- python - 生成区块链标头失败并出现错误
- uicollectionviewcell - 如果快速滚动CollectionView swift 5,则单元格数据错误
- mysql - Mysql每线程内存,变量导致线程内存使用?
- android - Android应用权限设置
- javascript - Vue计算属性不向模板发送数据
- python - 使用 Python 管理所有路径格式
- java - android中attrs的不同用途是什么?
- c++ - std::thread 的 lambda 移动捕获