java - 通过并发任务的数量限制异步 Mono(不是基于时间的)
问题描述
假设我有一个方法,它接受一个参数并返回一个Mono<Integer>
异步完成的方法。例如:
Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);
Mono<Integer> fetch(String a) {
return Mono.create(em -> {
scheduledExecutorService.schedule(() -> em.next(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS);
});
}
假设我有一个Flux<String>
可以输入上述fetch
方法并且可以包含很多元素的方法。
有没有一种方法可以确保该方法被并行调用,但将并发调用的数量限制为预定义的数量?
例如上面例子中的 4 个,而我有 16 个可用线程 - 所以从这个角度来看,我总是保留 12 个备用线程。
解决方案
假设通过“feed into”,您的意思是您正在使用flux.flatMap(this::fetch)
,那么您可以通过调用来设置 flatMap 并发flux.flatMap(this::fetch, 4)
。
此外,您的代码有两个编译错误:
- fetch 的返回类型与您提供给 sink ( )
Mono<Integer>
的项目类型不匹配。a + " result"
我想你的意思是Mono<String>
- MonoSink 没有
.next
方法。我想你的意思是.success
鉴于所有这些,这里有一个例子:
private Flux<String> fetchAll() {
return Flux.range(0, 50)
.map(i -> Integer.toString(i))
.flatMap(this::fetch, 4);
}
private Mono<String> fetch(String a) {
return Mono.create(em ->
scheduledExecutorService.schedule(() -> em.success(a + " result"),
10 + random.nextInt(50), TimeUnit.MILLISECONDS)
);
}
推荐阅读
- cocotb - cocotb示例代码开始工作的问题
- ruby - 保存禁止对列表的最有效数据结构
- python - status": 500, "body": "HTTP Error 400: Invalid URI: isHexDigit" 同时使用 skimage 将 s3 上的图像下载到本地
- javascript - 使用 django 将来自数组的数组中的数据传递给 chartjs
- php - php解析多级 - xml到数组
- ios - 如何在原生 iOS 文件应用程序中创建 ContextMenu 动画?
- javascript - Express:解析传递给 ejs 文件的解析对象的最安全方法:
- .net-core - 在本地调试 C# Azure 函数时静默终止进程
- css - React:sass-loader 没有将 SCSS 转换为 CSS
- javascript - 我可以将 .js 文件内容(或代码)导入 gatsby 中的 mdx 吗?