首页 > 解决方案 > 让线程在开始下一组任务之前等待所有任务完成

问题描述

我有一个由几个阶段组成的管道。同一阶段的作业可以并行处理。但是必须先完成第 1 阶段的所有工作,然后才能开始从事第 2 阶段的工作,等等。

我正在考虑使用CountDownEvent.

我的基础结构是

this.WorkerCountdownEvent = new CountdownEvent(MaxJobsInStage);
this.WorkerCountdownEvent.Signal(MaxJobsInStage); // Starts all threads
// Each thread runs the following code

for (this.currentStage = 0; this.currentStage < this.PipelineStages.Count; this.currentStage++)
{
    this.WorkerCountdownEvent.Wait();
    var stage = this.PipelineStages[this.currentStage];
    if (stage.Systems.Count < threadIndex)
    {
        var system = stage.Systems[threadIndex];
        system.Process();
    }

    this.WorkerCountdownEvent.Signal(); // <--

}

这对于处理一个阶段很有效。但是到达的第一个线程this.WorkerCountdownEvent.Signal()将导致应用程序崩溃,因为它试图将信号减小到零以下。

当然,如果我想阻止这种情况,并且让工作再次等待,我必须打电话给this.WorkerCountdownEvent.Reset(). 但是我必须在所有线程开始工作之后调用它,但在一个线程完成其工作之前。这似乎是一项不可能完成的任务?

我是否使用了错误的同步原语?或者我应该使用两个倒计时事件?还是我完全错过了什么?

(顺便说一句,作业通常需要不到一毫秒的时间,所以如果有人有更好的方法来使用像 ManualResetEventSlim 这样的“苗条”原语来做到这一点,则可以加分。线程池或任务<>不是我正在寻找的方向,因为这些线程将存在很长(几小时),并且需要每秒通过管道 60 次。所以在这里停止/启动任务的开销是相当大的)。

编辑:这个问题被标记为两个问题的重复。其中一个问题的答案是“使用 thread.Join()”,另一个问题是“使用 TPL”,这两个答案(在我看来)显然不是关于流水线和线程原语的问题的答案,例如CountDownEvent.

标签: c#multithreading.net-core

解决方案


我认为最适合这种情况的同步原语是Barrier.

使多个任务能够通过多个阶段并行地协作处理算法。

使用示例:

private Barrier _barrier = new Barrier(this.WorkersCount);

// Each worker thread runs the following code
for (i = 0; i < this.StagesCount; i++)
{
    // Here goes the work of a single worker for a single stage...
    _barrier.SignalAndWait();
}

更新:如果您希望工作人员异步等待信号,这里有一个AsyncBarrier实现。


推荐阅读