java - 如何并行处理文件的行?
问题描述
我想读取一个大文件,处理每一行并将结果插入数据库。我的目标是并行处理行,因为每个进程都是一个长时间运行的任务。因此,我希望一个线程继续读取,多个线程继续处理,一个线程继续以块的形式插入数据库。
我把它分解如下:
1)按顺序逐行读取文件(简单)
2)将每一行发送到一个线程池(3个线程),因为处理是长时间运行的任务。在线程池忙时阻止进一步的行读取。
3) 将每个处理过的行从每个 theadpool 写入 StringBuffer
4) 监控缓冲区大小,并将结果以块的形式写入数据库(例如,每 1000 个条目)
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}
问题:
future.get()
将始终阻止阅读器,直到未来返回结果缓冲区“监控”可能没有正确完成
可能我这样做的方式不对。但我怎样才能做到这一点?
旁注:文件大小约为 10GB,所以我不能先将整个文件读入内存来准备并行任务。
解决方案
我发现以下解决方案很优雅。它只是众多可能中的一种,但它在概念上很简单,而且
- 它会限制读取,
- 仅累积最少量的状态以在最后报告就绪
- 不需要显式处理线程
我只是将实际的测试方法与专用 GitHub存储库中提供的完整测试设置和辅助数据结构一起放在这里:
private final AtomicInteger count = new AtomicInteger();
private final Consumer<String> processor = (value) -> {
count.incrementAndGet();
};
@Test
public void onlyReadWhenExecutorAvailable() throws Exception {
Executor executor = Executors.newCachedThreadPool();
CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
String value = reader.read();
if (value == null) {
break;
}
semaphore.acquire();
CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
.thenAcceptAsync(v -> {
processor.accept(v);
semaphore.release();
}, executor);
done = done.thenCompose($ -> future);
}
done.get();
assertEquals(ENTRIES, count.get());
}
推荐阅读
- java - 有没有办法解压缩带有各种子文件夹的文件?
- flutter - 点击相应的列表视图项目时如何显示详细信息页面
- java - 在 VMware 虚拟机上运行 Java 客户端
- python - 用于格式化熊猫数据框的循环
- python - 图片无法使用 PIL 库在 Google colab 中显示
- r - R - 从起点和距离的纬度/经度
- go - 如何让 k8s 按照特定规则分配 gpu/npu 设备
- python-requests - 使用请求在 Python 中邀请用户使用 Azure AD 时出现 Microsoft Graph“无法读取 JSON 请求有效负载”错误
- azure-cosmosdb - 我可以用 Cosmos DB SDK 3.0 替换 Document DB Generic Repository 吗?
- linux - 不应使用 echo 命令打印反斜杠 (\)