首页 > 解决方案 > 使用 ExecutorService 详细说明大列表中的项目

问题描述

我有大量的项目,我需要对每个项目执行一些操作而不更改它们。

目前我所尝试的只是使用 ExecutorService 和固定大小的池,但它会详细说明一定数量的项目然后停止,代码如下所示:

val pool = Executors.newFixedThreadPool(30)
val cursor = ....
var current = 0

while (cursor.moveToNext()) {  //This is a actually a cursor rather than a list
    val worker = Runnable {
        //elaborate data in cursor

        //print somewhere the current item count
        //current++
    }

    pool.submit(worker)
}

这似乎确实详细说明了一些项目,我的设备上的 40 个,然后停止。提交可以排队的线程数有限制吗?有没有更好的方法来做到这一点?我有很多数据,我当前的设备上有大约 37k 个条目

标签: javaandroidmultithreadingkotlinexecutorservice

解决方案


我建议不要为每个项目创建一个新的 Runnable 来执行,这是浪费资源。取而代之的是,将所有项目放入线程安全队列并提交 N 个任务,每个任务在一个循环中池/从队列中获取下一个项目并处理它。这是工作队列模式:

    final int numOfWorkers = 30;

    final BlockingQueue items = new ArrayBlockingQueue(1000);

    final ExecutorService pool = Executors.newFixedThreadPool(numOfWorkers);

    for (int i = 0; i < numOfWorkers; i++) {
        pool.submit(() -> {
            while (true) {
                try {
                    final Object item = items.take();

                    // process the item
                    // ...
                } catch (InterruptedException e) { // we are here because of
                    // pool.shutdownNow() is called
                    Thread.currentThread().interrupt();
                    break;
                } catch (Exception e) { // an error while processing the item
                    e.printStackTrace(); // use some logging?
                } finally {
                    if (!Thread.currentThread().isInterrupted()) {
                        // notify about an item has been processed/queue changed
                        synchronized (items) {
                            items.notify();
                        }
                    }
                }
            }
        });
    }

    val cursor = ....

    while (cursor.moveToNext()) {  //This is a actually a cursor rather than a list
        items.put(cursor.item); // put next item to the queue
    }

    // wait until the queue is empty/all items have been processed
    synchronized(items) {
        while (!items.isEmpty()) {
            items.wait();
        }
    }

    pool.shutdownNow(); // now interrupt all ExecutorService's threads

并使用 AtomicXXX 或 LongAdder 之类的东西在多线程环境中制作计数器。


推荐阅读