首页 > 解决方案 > 并行和顺序处理 Java 任务

问题描述

在我的程序中,用户可以通过一个界面触发不同的任务,这需要一些时间来处理。因此它们由线程执行。到目前为止,我已经实现了它,因此我有一个带有一个线程的执行器,它一个接一个地执行所有任务。但现在我想把所有东西都并行化一点。

即我想并行运行任务,除非它们具有相同的路径,然后我想按顺序运行它们。例如,我的池中有 10 个线程,当一个任务进来时,该任务应该分配给当前正在处理具有相同路径的任务的工作人员。如果worker当前没有处理具有相同路径的任务,则该任务应该由当前空闲的worker处理。

附加信息:任务是在本地文件系统中的文件上执行的任何类型的任务。例如,重命名文件。因此,任务具有属性path。而且我不想同时对同一个文件执行两个任务,所以这样的路径相同的任务应该顺序执行。

这是我的示例代码,但还有工作要做:

我的一个问题是,我需要一种安全的方法来检查工作人员当前是否正在运行并获取当前正在运行的工作人员的路径。安全的意思是,不会发生同时访问的问题或其他线程问题。

    public class TasksOrderingExecutor {
    
        public interface Task extends Runnable {
            //Task code here
            String getPath();
        }
    
        private static class Worker implements Runnable {
    
            private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

            //some variable or mechanic to give the actual path of the running tasks??
    
            private volatile boolean stopped;
    
            void schedule(Task task) {
                tasks.add(task);
            }
    
            void stop() {
                stopped = true;
            }
    
            @Override
            public void run() {
                while (!stopped) {
                    try {
                        Task task = tasks.take();
                        task.run();
                    } catch (InterruptedException ie) {
                        // perhaps, handle somehow
                    }
                }
            }
        }
    
        private final Worker[] workers;
        private final ExecutorService executorService;
    
        /**
         * @param queuesNr nr of concurrent task queues
         */
        public TasksOrderingExecutor(int queuesNr) {
            Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
            executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
            workers = new Worker[queuesNr];
            for (int i = 0; i < queuesNr; i++) {
                Worker worker = new Worker();
                executorService.submit(worker);
                workers[i] = worker;
            }
        }
    
        public void submit(Task task) {
            Worker worker = getWorker(task);
            worker.schedule(task);
        }
    
        public void stop() {
            for (Worker w : workers) w.stop();
            executorService.shutdown();
        }
    
        private Worker getWorker(Task task) {
            //check here if a running worker with a specific path exists? If yes return it, else return a free worker. How do I check if a worker is currently running?
            return workers[task.getPath() //HERE I NEED HELP//];
        }
    }

标签: javamultithreadingsynchronizationthreadpoolexecutor

解决方案


似乎您需要某种“任务调度程序”来执行或保存某些任务,具体取决于某些标识符(这里是任务应用到的文件的路径)。

你可以使用这样的东西:

public class Dispatcher<I> implements Runnable {

/**
 * The executor used to execute the submitted task
 */
private final Executor executor;

/**
 * Map of the pending tasks
 */
private final Map<I, Deque<Runnable>> pendingTasksById = new HashMap<>();

/**
 * set containing the id that are currently executed
 */
private final Set<I> runningIds = new HashSet<>();

/**
 * Action to be executed by the dispatcher
 */
private final BlockingDeque<Runnable> actionQueue = new LinkedBlockingDeque<>();

public Dispatcher(Executor executor) {
    this.executor = executor;
}

/**
 * Task in the same group will be executed sequentially (but not necessarily in the same thread)
 * @param id the id of the group the task belong
 * @param task the task to execute
 */
public void submitTask(I id, Runnable task) {
    actionQueue.addLast(() -> {
        if (canBeLaunchedDirectly(id)) {
            executeTask(id, task);
        } else {
            addTaskToPendingTasks(id, task);
            ifPossibleLaunchPendingTaskForId(id);
        }
    });
}


@Override
public void run() {
    while (!Thread.currentThread().isInterrupted()) {
        try {
            actionQueue.takeFirst().run();
        } catch (InterruptedException e) {
            Thread.currentThread().isInterrupted();
            break;
        }
    }
}


private void addTaskToPendingTasks(I id, Runnable task) {
    this.pendingTasksById.computeIfAbsent(id, i -> new LinkedList<>()).add(task);
}


/**
 * @param id an id of a group
 * @return true if a task of the group with the provided id is currently executed
 */
private boolean isRunning(I id) {
    return runningIds.contains(id);
}

/**
 * @param id an id of a group
 * @return an optional containing the first pending task of the group,
 * an empty optional if no such task is available
 */
private Optional<Runnable> getFirstPendingTask(I id) {
    final Deque<Runnable> pendingTasks = pendingTasksById.get(id);
    if (pendingTasks == null) {
        return Optional.empty();
    }
    assert !pendingTasks.isEmpty();
    final Runnable result = pendingTasks.removeFirst();
    if (pendingTasks.isEmpty()) {
        pendingTasksById.remove(id);
    }
    return Optional.of(result);
}

private boolean canBeLaunchedDirectly(I id) {
    return !isRunning(id) && pendingTasksById.get(id) == null;
}

private void executeTask(I id, Runnable task) {
    this.runningIds.add(id);
    executor.execute(() -> {
        try {
            task.run();
        } finally {
            actionQueue.addLast(() -> {
                runningIds.remove(id);
                ifPossibleLaunchPendingTaskForId(id);
            });
        }
    });
}

private void ifPossibleLaunchPendingTaskForId(I id) {
    if (isRunning(id)) {
        return;
    }
    getFirstPendingTask(id).ifPresent(r -> executeTask(id, r));
}

}

要使用它,您需要在单独的线程中启动它(或者您可以将其调整为更清洁的解决方案),如下所示:

    final Dispatcher<Path> dispatcher = new Dispatcher<>(Executors.newCachedThreadPool());
    new Thread(dispatcher).start();
    dispatcher.submitTask(path, task1);
    dispatcher.submitTask(path, task2);

这是基本示例,您可能需要保留线程,甚至更好地将所有这些都包装在一个类中。


推荐阅读