java - Optimal orchestration of list of network calls and processing tasks in Java
问题描述
I have the following workflow:
There are n
records that need to be retrieved over the network and subsequently n
expensive computations that need to be done on each. Put in code, this would look like:
List<Integer> ids = {1,2,....n};
ids.forEach(id -> {
Record r = RetrieveRecord(id); // Blocking IO
ProcessRecord(r); // CPU Intensive
})
I would like to convert the blocking part into async so that the time is minimized with a single thread- essentially, by ensuring that record i+1
is being retrieved when record i
is being processed. So that the execution would look like:
Retrieve(1).start()
Retrieve(1).onEnd(() -> { start Retrieve(2), Process(1) })
Retrieve(2).onEnd(() -> { start Retrieve(3), Process(2) })
....
Now I can come up with the naive way to implement this with a List<>
and CompletableFuture
, but this would require me to handle the first record differently.
Is there a more elegant way of solving this with something like reactive streams?
A solution that would maybe let me easily configure how many records Process()
can trail behind Retreive()
?
解决方案
因此,您有 N 个任务并希望并行运行它们,但同时运行不超过 K 个任务。最自然的方法是最初有一个任务生成器和一个具有 K 权限的权限计数器。任务生成器创建 K 个任务并等待更多权限。每个权限都归某个任务所有,并在任务结束时返回。Java 中的标准权限计数器是 class java.util.concurrent.Semaphore
:
List<Integer> ids = {1,2,....n};
Semaphore sem = new Semaphore(K);
ids.forEach(id -> {
sem.aquire();
CompletableFuture<Data> fut = Retrieve(id);
fut.thenRun(sem::release);
fut.thenAcceptAsync(this::ProcessRecord, someExecutor);
})
由于任务生成器只占用一个线程,使其异步的意义不大。但是,如果您不想为任务生成器使用专用线程并想实现异步解决方案,那么主要问题是什么类可以起到异步权限计数器的作用。您有 3 个选项:
- 使用隐式异步权限计数器,它是反应流的一部分,在 RxJava、项目 Reactor 等中找到。
- 使用
org.df4j.core.boundconnector.permitstream.Semafor
包含在我的异步库df4j中的显式异步信号量 - 自己做
推荐阅读
- javascript - 展平一个数组,即 [[1, 2, 3], [4, 5], [6]] 应该给出 [1, 2, 3, 4, 5, 6] 的输出。代码中的错误来源
- python - 如何绘制带有条形值的条形图?
- ajax - 如何设置一个从文件动态加载图像并最好用烧瓶显示它们的站点
- ios - 获取 NSException 错误崩溃应用程序
- r - R:使用 FeatureImp$new 绘制重要性特征
- javascript - 如何使用 AngularJS 在 c# 中向我的后端提交表单?
- postgresql - MultiXactId 尚未创建——postgres 中出现明显的环绕错误
- c# - 文件作为未嵌入的资源
- python - 嵌套序列化程序节省
- spring - 带有 Spring Security OAuth2 和基于 Facebook Graph 令牌的登录的 RESTfull API