首页 > 解决方案 > 通过并发任务的数量限制异步 Mono(不是基于时间的)

问题描述

假设我有一个方法,它接受一个参数并返回一个Mono<Integer>异步完成的方法。例如:

Random random = new Random();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(16);

Mono<Integer> fetch(String a) {
  return Mono.create(em -> {
    scheduledExecutorService.schedule(() -> em.next(a + " result"), 
      10 + random.nextInt(50), TimeUnit.MILLISECONDS);
  });
}

假设我有一个Flux<String>可以输入上述fetch方法并且可以包含很多元素的方法。

有没有一种方法可以确保该方法被并行调用,但将并发调用的数量限制为预定义的数量?

例如上面例子中的 4 个,而我有 16 个可用线程 - 所以从这个角度来看,我总是保留 12 个备用线程。

标签: javaproject-reactor

解决方案


假设通过“feed into”,您的意思是您正在使用flux.flatMap(this::fetch),那么您可以通过调用来设置 flatMap 并发flux.flatMap(this::fetch, 4)

此外,您的代码有两个编译错误:

  1. fetch 的返回类型与您提供给 sink ( )Mono<Integer>的项目类型不匹配。a + " result"我想你的意思是Mono<String>
  2. MonoSink 没有.next方法。我想你的意思是.success

鉴于所有这些,这里有一个例子:

    private Flux<String> fetchAll() {
        return Flux.range(0, 50)
                .map(i -> Integer.toString(i))
                .flatMap(this::fetch, 4);

    }

    private Mono<String> fetch(String a) {
        return Mono.create(em ->
                scheduledExecutorService.schedule(() -> em.success(a + " result"),
                        10 + random.nextInt(50), TimeUnit.MILLISECONDS)
        );
    }


推荐阅读