java - CompletableFuture 以及使用 FileReader 读取,程序不会退出
问题描述
背景
构建一个数据管道,接收到的每条消息都将被异步处理。试图通过以下方式模拟行为
- 从文件中读取消息
- 使用 CompletableFuture 处理
代码
BufferedReader reader = null;
ExecutorService service = Executors.newFixedThreadPool(4);
try {
String filepath = str[0];
FileReaderAsync fileReaderAsync = new FileReaderAsync();
reader = new BufferedReader(new FileReader(filepath));
Random r = new Random();
String line;
while ((line = reader.readLine()) != null) {
Integer val = Integer.valueOf(line.trim());
int randomInt = r.nextInt(5);
Thread.sleep(randomInt * 100);
CompletableFuture.supplyAsync(() -> {
System.out.println("Square : " + val);
return val * val;
}, service)
.thenApplyAsync(value -> {
System.out.println(":::::::Double : " + value);
return 2 * value;
}, service)
.thenAccept(value -> {
System.out.println("Answer : " + value);
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
reader.close();
} catch (Exception e) {
throw new RuntimeException(e.getMessage());
}
}
为简单起见,仅粘贴主方法代码,假设变量已声明并在范围内。
问题
代码
- 程序工作正常但不退出,尝试评论异步逻辑并读取文件。它工作正常并且也结束了。
设计
- 在 Streaming 管道中,如果每条消息都传递给 CompletableFuture 进行处理,此 Async 模型是否适用于每条传入消息?
- 或者它会阻止当前消息被处理?
- 是否需要引入另一个队列,然后从中消费,而不是在传入消息流入时消费?
编辑 1 添加
public void shutdown() {
service.shutdown();
}
和
reader.close();
fileReaderAsync.shutdown();
这成功了。
解决方案
池中有 4 个线程,但 Thread.sleep() 会阻塞主线程。您的程序读取一行,最多块。5 秒,然后将触发异步代码,这根本不需要任何异步,实际上会产生巨大的开销。
不要在异步程序中使用 Thread.sleep()。
但我试图了解你的代码,我可以提供这个:
public int calcWork(final int x) {
return x*x;
}
public void iter_async_rec(final BufferedReader reader) {
String line = reader.readline();
if (line != null) {
int i = Integer.tryParse(line); // checks required
CompetableFuture.supplyAsync(calcWork(i))
.thenSupplyAsync(i->System.out.println(i))
.thenRunAsync(()->iter_asnc_rec(reader))
}
}
另外:大多数时候,只使用标准执行器是最好的选择。相反,给定的样本不会提高速度。
也许看看反应的想法!?反应式Java
推荐阅读
- iis - 在 Win10 中使用本地 IIS 服务器进行问题测试
- python - RuntimeError:输入类型(torch.FloatTensor)和权重类型(torch.cuda.FloatTensor)应该相同 - PyTorch
- python - 自动进行 Excel 对帐
- wordpress - ACF - 创建帖子时初始化强制字段
- angular - Angular 中的二级导航
- c++ - 尝试初始化类中的集合时出错
- file - 输出文件的绝对路径?
- javascript - 使用stacktrace在节点中获取函数调用者文件给了我错误的文件路径
- python - 可以在循环中多次使用 getline() 吗?- Cython,文件读取
- css - 在 Bootstrap CSS 中将一个 div 置于另一个上方(浮动)?