首页 > 解决方案 > 从向量中删除已完成的线程

问题描述

我有很多工作,我想并行运行其中的一部分。例如。我有 100 个作业要运行,我想一次运行 10 个线程。这是我当前解决此问题的代码:

#include <thread>
#include <vector>
#include <iostream>
#include <atomic>
#include <random>
#include <mutex>

int main() {
    constexpr std::size_t NUMBER_OF_THREADS(10);
    std::atomic<std::size_t> numberOfRunningJobs(0);

    std::vector<std::thread> threads;
    std::mutex maxThreadsMutex;
    std::mutex writeMutex;
    std::default_random_engine generator;
    std::uniform_int_distribution<int> distribution(0, 2);

    for (std::size_t id(0); id < 100; ++id) {
        if (numberOfRunningJobs >= NUMBER_OF_THREADS - 1) {
            maxThreadsMutex.lock();
        }
        ++numberOfRunningJobs;
        threads.emplace_back([id, &numberOfRunningJobs, &maxThreadsMutex, &writeMutex, &distribution, &generator]() {
            auto waitSeconds(distribution(generator));
            std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));
            writeMutex.lock();
            std::cout << id << " " << waitSeconds << std::endl;
            writeMutex.unlock();
            --numberOfRunningJobs;
            maxThreadsMutex.unlock();
        });
    }

    for (auto &thread : threads) {
        thread.join();
    }

    return 0;
}

在 for 循环中,我检查有多少作业正在运行,如果一个插槽是空闲的,我会在向量中添加一个新线程。在每个线程结束时,我会减少正在运行的作业数量并解锁互斥锁以启动一个新线程。这解决了我的任务,但有一点我不喜欢。我需要一个大小为 100 的向量来存储所有线程,最后我需要加入所有 100 个线程。我想在完成后从向量中删除每个线程,以便向量最多包含 10 个线程,最后我必须加入 10 个线程。我考虑通过引用 lambda 来传递向量和迭代器,以便我可以在最后删除元素,但我不知道如何。如何优化我的代码以在向量中使用最多 10 个元素?

标签: c++multithreadingc++11

解决方案


由于您似乎不需要非常细粒度的线程控制,因此我建议使用 OpenMP 解决此问题。OpenMP 是一种基于行业标准指令的方法,用于并行化 C、C++ 和 FORTRAN 代码。这些语言的每个主要编译器都实现了它。

使用它可以显着降低代码的复杂性:

#include <iostream>
#include <random>

int main() {
    constexpr std::size_t NUMBER_OF_THREADS(10);

    std::default_random_engine generator;
    std::uniform_int_distribution<int> distribution(0, 2);

    //Distribute the loop between threads ensuring that only
    //a specific number of threads are ever active at once.
    #pragma omp parallel for num_threads(NUMBER_OF_THREADS)
    for (std::size_t id(0); id < 100; ++id) {
        #pragma omp critical //Serialize access to generator
        auto waitSeconds(distribution(generator));

        std::this_thread::sleep_for(std::chrono::seconds(waitSeconds));

        #pragma omp critical //Serialize access to cout
        std::cout << id << " " << waitSeconds << std::endl;
    }        

    return 0;
}

要使用 OpenMP,您需要编译:

g++ main.cpp -fopenmp

有时需要生成和直接协调线程,但大量旨在使并行性更容易的新语言和库说明了使用更简单的并行性路径就足够了的用例数量。


推荐阅读