首页 > 解决方案 > Optimal orchestration of list of network calls and processing tasks in Java

问题描述

I have the following workflow: There are n records that need to be retrieved over the network and subsequently n expensive computations that need to be done on each. Put in code, this would look like:

List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
    Record r = RetrieveRecord(id); // Blocking IO
    ProcessRecord(r); // CPU Intensive
})

I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1 is being retrieved when record i is being processed. So that the execution would look like:

Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....

Now I can come up with the naive way to implement this with a List<> and CompletableFuture, but this would require me to handle the first record differently.

Is there a more elegant way of solving this with something like reactive streams? A solution that would maybe let me easily configure how many records Process() can trail behind Retreive()?

标签: javaasynchronousstream

解决方案


因此,您有 N 个任务并希望并行运行它们,但同时运行不超过 K 个任务。最自然的方法是最初有一个任务生成器和一个具有 K 权限的权限计数器。任务生成器创建 K 个任务并等待更多权限。每个权限都归某个任务所有,并在任务结束时返回。Java 中的标准权限计数器是 class java.util.concurrent.Semaphore

List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
    sem.aquire();
    CompletableFuture<Data> fut = Retrieve(id);
    fut.thenRun(sem::release);
    fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})

由于任务生成器只占用一个线程,使其异步的意义不大。但是,如果您不想为任务生成器使用专用线程并想实现异步解决方案,那么主要问题是什么类可以起到异步权限计数器的作用。您有 3 个选项:

  • 使用隐式异步权限计数器,它是反应流的一部分,在 RxJava、项目 Reactor 等中找到。
  • 使用org.df4j.core.boundconnector.permitstream.Semafor包含在我的异步库df4j中的显式异步信号量
  • 自己做

推荐阅读