首页 > 解决方案 > 如何有效管理大量 Java 可运行任务

问题描述

我有一个具有 100K+ 标识符的数据库表。任务是读取每个 ID 并启动耗时的充实过程,其结果一旦完成,就需要持久化到数据库中。

当前基于 Java 的解决方案基于以下流程:

Java ThreadPool 和 Executor 框架用于容纳此功能。

未来的预期是标识符的数量将急剧增长,因此为了避免潜在的内存压力 - 我正在考虑开始批量读取 ID 并在现有任务之一完成时创建相关任务(即按需)。

例如,一开始,在主线程中,从数据库中获取 500 个 ID,创建 10 个任务(每个任务 50 个 ID),将它们传递给工作线程来处理它们,一旦第一个任务完成 - 从数据库中提取其他 ID并创建一个附加任务。然后重复该过程,直到处理完所有 ID。

我的问题是如何通知主线程特定任务/任务已完成,以允许主线程从数据库中提取额外的 ID 并创建额外的任务?这是解决此问题的最佳方法还是在这种情况下其他架构会更好?

标签: javamultithreadingconcurrencythreadpoolexecutor

解决方案


当您将 Runnable/Callable 对象提交给执行器服务时,您会返回一个Future对象。您可以跟踪这些Future对象,并询问它们的状态。每个人都会报告它是否被取消或完成。

全部完成后,向执行器服务提交另一批 Runnable/Callable 任务。

您可以将此Future-checker batch-submitter 作为后台线程上的另一个任务运行,并使用 aScheduledExecutorService重复执行。主线程不直接参与。

除此之外,我建议你检查你的假设。显然,您担心实例化数百万个 Runnable/Callable 对象,以免耗尽内存。但我怀疑每个 Runnable/Callable 对象和结果Future对象都占用大量内存。我建议您运行模拟以查看并使用监视器或分析工具检查内存使用情况。

这是一些示例代码。首先我的Callable.

package work.basil.example;

import java.util.concurrent.Callable;

public record Enrichment(Integer id) implements Callable
{
    @Override
    public Boolean call ( ) throws Exception
    {
        System.out.println( this.toString() );
        return Boolean.TRUE; // Report success.
    }
}

还有一些代码可以预定 1000 万个可运行实例,并累积Future提交给执行器服务时产生的每个对象。

Instant start = Instant.now();
System.out.println( "INFO - Start running demo at " + start );

int limit = 10_000_000;
List < Future > futures = new ArrayList <>( limit );
ExecutorService executorService = null;
try
{
    executorService = Executors.newFixedThreadPool( 3 );
    for ( int i = 1 ; i <= limit ; i++ )
    {
        Callable < Boolean > callable = new Enrichment( i );
        Future < Boolean > future = executorService.submit( callable );
        futures.add( future );
    }
    System.out.println( "INFO - Submitted %d tasks.".formatted( limit ) );
}
finally
{
    if ( Objects.nonNull( executorService ) ) { executorService.shutdown(); }
}

// Sleep our main thread long enough for background work to finish.
try
{
    System.out.println( "INFO - Sleeping main thread." );
    Thread.sleep( TimeUnit.MINUTES.toMillis( 1 ) );
}
catch ( InterruptedException e )
{
    e.printStackTrace();
}

Instant done = Instant.now();
System.out.println( "INFO - Done running demo at " + done );

请注意,在这个类的特殊情况下,我不需要真正实例化新对象Enrichment。我们可以简单地在所有 1000 万次run. 但我想要一个更糟糕的例子——如果你的场景确实需要新的对象,我想看看对内存的大致影响。

在我在 64 位 Intel Mac mini 上使用来自 AdoptOpenJDK 的 Java 15 进行的试验中,这项工作花费了不到一分钟的时间,并且使用了 4.5 gigs 来完成 1000 万个任务。


顺便说一句,在未来,Project Loom可能会简化你的工作。您将能够简单地安排数百万个“虚拟线程”(纤程)在有限数量的平台/内核线程上运行。Project Loom抢先体验版本现已推出。请参阅 Ron Pressler 在 YouTube 上的 2020 年末演讲。


推荐阅读