首页 > 解决方案 > 如何并行处理文件的行?

问题描述

我想读取一个大文件,处理每一行并将结果插入数据库。我的目标是并行处理行,因为每个进程都是一个长时间运行的任务。因此,我希望一个线程继续读取,多个线程继续处理,一个线程继续以块的形式插入数据库。

我把它分解如下:

1)按顺序逐行读取文件(简单)

2)将每一行发送到一个线程池(3个线程),因为处理是长时间运行的任务。在线程池忙时阻止进一步的行读取。

3) 将每个处理过的行从每个 theadpool 写入 StringBuffer

4) 监控缓冲区大小,并将结果以块的形式写入数据库(例如,每 1000 个条目)

ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

问题:

可能我这样做的方式不对。但我怎样才能做到这一点?

旁注:文件大小约为 10GB,所以我不能先将整个文件读入内存来准备并行任务。

标签: javafutureexecutorservice

解决方案


我发现以下解决方案很优雅。它只是众多可能中的一种,但它在概念上很简单,而且

  • 它会限制读取,
  • 仅累积最少量的状态以在最后报告就绪
  • 不需要显式处理线程

我只是将实际的测试方法与专用 GitHub存储库中提供的完整测试设置和辅助数据结构一起放在这里:

private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}

推荐阅读