首页 > 解决方案 > 多个线程等待所有线程完成,直到新工作开始

问题描述

我正在尝试创建一种线程池,它在单独的线程上运行函数,并且只有在所有函数完成后才开始新的迭代。

map<size_t, bool> status_map;
vector<thread> threads;
condition_variable cond;

bool are_all_ready() {
  mutex m;
  unique_lock<mutex> lock(m);
  for (const auto& [_, status] : status_map) {
    if (!status) {
      return false;
    }
  }
  return true;
}

void do_little_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;
}

void do_some_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;
}

void do_much_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;
}

void run(const function<void(size_t)>& function, size_t id) {
  while (true) {
    mutex m;
    unique_lock<mutex> lock(m);

    cond.wait(lock, are_all_ready);

    status_map[id] = false;
    cond.notify_all();

    function(id);

    status_map[id] = true;
    cond.notify_all();
  }
}
 
int main() {
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) {
    thread.join();
  }

  return EXIT_SUCCESS;
}

我希望得到输出:

0 did little work...
1 did some work...
2 did much work...
0 did little work...
1 did some work...
2 did much work...
        .
        .
        .

在各自的超时之后但是当我运行程序时我只得到

0 did little work...
0 did little work...
        .
        .
        .

我还不得不说,我对多线程相当陌生,但在我的理解中,condition_variable应该阻塞每个线程,直到谓词返回 true。在我的情况下are_all_ready,应该在所有函数都返回后返回 true。

标签: c++multithreading

解决方案


按原样,您的程序由于同时访问status_map.

当你这样做时:

void run(const function<void(size_t)>& function, size_t id)
{
...
    mutex m;
    unique_lock<mutex> lock(m);
...
    status_map[id] = false;

创建的locks 是局部变量,每个线程一个,因此是独立的。因此,它不会阻止多个线程同时写入status_map,从而导致崩溃。这就是我在我的机器上得到的。

现在,如果你做mutex静态,一次只有一个线程可以访问地图。但这也使得一次只有一个线程运行。有了这个,我看到 0、1 和 2 正在运行,但一次只运行一次,并且前一个线程很有可能再次运行。

我的建议,回到绘图板并使其更简单。所有线程同时运行,单个互斥锁来保护映射,只锁定互斥锁来访问映射,而且......好吧,事实上,我什至不认为需要条件变量。

例如有什么问题:

#include <thread>
#include <iostream>
#include <vector>

using namespace std;

vector<thread> threads;

void do_little_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(1));
  cout << id << " did little work..." << endl;
}

void do_some_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(2));
  cout << id << " did some work..." << endl;
}

void do_much_work(size_t id) {
  this_thread::sleep_for(chrono::seconds(4));
  cout << id << " did much work..." << endl;
}

void run(const function<void(size_t)>& function, size_t id) {
  while (true) {
    function(id);
  }
}

int main() {
  threads.push_back(thread(run, do_little_work, 0));
  threads.push_back(thread(run, do_some_work, 1));
  threads.push_back(thread(run, do_much_work, 2));

  for (auto& thread : threads) {
    thread.join();
  }

  return EXIT_SUCCESS;
}

推荐阅读