首页 > 解决方案 > 在 while 循环中轮询阻塞队列时安全地线程

问题描述

考虑下面的示例,其中ItemDeletionManager可以将要删除的项目排入队列。工作线程可以调用它的createJob()方法,该方法将创建一个复合作业以一次删除所有项目。假设工作线程每隔几分钟左右定期运行一次。但是要删除的项目每隔几秒钟就会排队。这就是为什么有必要创建一个复合作业而不是让工作人员在每次迭代中为一个项目创建一个作业。

如果我们现在有两个工人共享同一个经理,如下所示,

ItemDeletionManager itemManager = new ItemDeletionManager();

DeletionWorker a = new DeletionWorker(itemManager);
DeletionWorker b = new DeletionWorker(itemManager);

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
scheduledExecutorService.scheduleAtFixedRate(a, 0, 10, TimeUnit.MINUTES);
scheduledExecutorService.scheduleAtFixedRate(b, 0, 10, TimeUnit.MINUTES);

我最终可能会遇到只有一个项目要删除的情况,并且两个工作人员都已验证阻塞队列不为空。但是在一个线程轮询要删除的项目并创建它的删除分配后,第二个工作人员得到一个null并最终创建一个空的删除分配。

最简单的选择是synchronize ItemDeletionManager创建工作。但是有更好的解决方案吗?


/**
 * Worker that executes ItemDeletionManager's create job.
 */
class DeletionWorker implements Runnable {

    ItemDeletionManager itemManager;

    public DeletionWorker(ItemDeletionManager itemManager) {
        this.itemManager = itemManager;
    }

    @Override
    public void run() {
        itemManager.createJob();
    }
}

/**
 * Manages deletion of Items.
 */
class ItemDeletionManager {

    private final Queue<Integer> idsOfItemsToDelete;

    public ItemDeletionManager() {
        this.idsOfItemsToDelete = new LinkedBlockingQueue<>();
    }

    public void enqueue(int itemId) {
        this.idsOfItemsToDelete.add(itemId);
    }

    public CompoundJob createJob() {
        CompoundJob job = new CompoundJob();
        while (!idsOfItemsToDelete.isEmpty()) {
            // 2 threads can reach this point at the same time
            Integer itemId = idsOfItemsToDelete.poll();
            job.addSubAssignment(DeletionAssignment.of(itemId));
        }
        return job;
    }
}

/**
 * Implementers of this handle its respective requirement.
 * Eg: DeletionAssignment handles deletion of item, CreationAssignment handles creation of item and so on..
 */
interface Assignment {

    void execute();
}

/**
 * Executes a collection of assignments.
 */
class CompoundJob {

    private final List<Assignment> subAssignments = new ArrayList<>();

    public void addSubAssignment(Assignment assignment) {
        subAssignments.add(assignment);
    }

    public void doJob() {
        for (Assignment assignment : subAssignments) {
            assignment.execute();
        }
    }
}

标签: javamultithreading

解决方案


正如您已经确定的那样,调用 isEmpty() 然后 poll() 不是线程安全的,除非您进行同步,但是使用并发数据结构这样做是非常可悲的。

事实上,您不应该事先调用 isEmpty() 。无论你调用 take() 并且它阻塞直到有一个项目,或者你直接调用 poll() 而不首先检查 isEmpty() 并且知道,如果你得到 null,那么这意味着队列是空的。您还可以通过指定时间限制来选择 take() 和 poll() 之间的中间选项。

我建议你直接使用 poll() 并处理 null,如下所示:

while (true) {
  Integer itemId = idsOfItemsToDelete.poll();
  if (itemId==null) break;
  job.addSubAssignment(DeletionAssignment.of(itemId));
}

即使多个线程同时执行相同的代码,也可以保证您不会获得两次相同的项目。但是,检索不会平均分配,即可能会发生一个线程获取所有项目而另一个不获取任何项目的情况。


推荐阅读