c++ - C++多线程错误:单生产者多消费者
问题描述
我正在尝试实现 Single producer Multiple consumer ,但是下面的代码无法编译。有人可以帮助解决这个错误吗?也可以从这个池中唤醒所有线程并且随机线程能够获取锁吗?
- TIA
` threadPool/main.cpp:4: /Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/thread:364:17:
错误:没有用于初始化 '_Gp' 的匹配构造函数(又名 'tuple<unique_ptrstd::__1::__thread_struct, void (TestClass::*)(), TestClass>') new _Gp(std::move(__tsp), ^ ~~~~~~~~~~~~~~~~~
ls/usr/bin/../include/c++/v1/type_traits:2422:12: error: call to implicitly-deleted copy constructor of 'typename
衰减::type' (又名'TestClass') return _VSTD::forward<_Tp>(__t);
----------------------------------------------
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
using namespace std;
class TestClass{
public:
void producer(int i) {
unique_lock<mutex> lockGuard(mtx);
Q.push(i);
lockGuard.unlock();
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this]() {
return !Q.empty();
});
cout<<this_thread::get_id();
cout<<Q.front()<<endl;
Q.pop();
lockGuard.unlock();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
int MAX_THREADS = std::thread::hardware_concurrency()-1;
vector<thread> ThreadVector;
TestClass testObj;
for(int i=0; i<MAX_THREADS; i++){
ThreadVector.emplace_back(&TestClass::consumer, std::move(testObj));
cout<<"Pool threadID:" <<ThreadVector[i].get_id()<<endl;
}
TestClass testObj2;
for(int i=0; i<10; i++) {
testObj.producer(i);
}
for(auto &&t : ThreadVector) {
t.join();
}
return 0;
}
`
Another version to call threads
int main()
{
std::vector<std::thread> vecOfThreads;
std::function<void(TestClass&)> func = [&](TestClass &obj) {
while(1) {
obj.consumer();
}
};
unsigned MAX_THREADS = std::thread::hardware_concurrency()-1;
TestClass obj;
for(int i=0; i<MAX_THREADS; i++) {
std::thread th1(func, std::ref(obj));
vecOfThreads.emplace_back(std::move(th1));
}
TestClass prod;
for(int i=0; i<10; i++) {
prod.producer(i);
}
for (std::thread & th : vecOfThreads)
{
if (th.joinable())
th.join();
}
return 0;
}
解决方案
std::move(testObj)
应该是&testObj
(指向要调用的对象的指针consumer
)-或std::ref(testobj)
(变成reference_wrapper
(也持有指向对象的指针)。- 您应该调用
produce
至少与线程一样多的次数,否则程序将无法完成。 - 你不需要
unlock
手动。守卫在超出范围时会自动解锁。
例子:
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx); // here a lock_guard is enough
Q.push(i);
// no manual unlocking
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this] { return !Q.empty(); });
cout << this_thread::get_id();
cout << Q.front() << endl;
Q.pop();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS); // since you know how many, reserve
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
// here, &testobj
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
// produce MAX_THREADS of things to put in the queue:
for(int i = 0; i < MAX_THREADS; i++) {
testObj.producer(i);
}
for(auto&& t : ThreadVector) {
t.join();
}
}
关于评论部分中的问题:如果您想让consumer
线程保持运行直到您告诉它们退出,您可以添加线程监控run
的另一个变量(此处称为) 。consumer
例子:
#include <condition_variable>
#include <iostream>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx);
Q.push(i);
to_pool.notify_one();
}
void consumer() {
while(true) {
unique_lock<mutex> lockGuard(mtx);
to_pool.wait(lockGuard, [this] { return !run || !Q.empty(); });
if(!run) break; // time to quit
cout << this_thread::get_id() << ' ' << Q.front() << endl;
Q.pop();
// Tell producer that we picked one from the queue.
// if it's only interesting to notify when the queue is empty,
// add: if(Q.empty())
to_producer.notify_one();
}
};
void stop() {
lock_guard<mutex> lockGuard(mtx);
run = false; // tell all pool threads to quit
to_pool.notify_all();
}
void wait_for_all_work_to_be_done() {
std::unique_lock<mutex> lg(mtx);
to_producer.wait(lg, [this] { return Q.empty(); });
}
private:
bool run = true;
mutex mtx;
condition_variable to_pool;
condition_variable to_producer;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS);
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
for(int i = 0; i < MAX_THREADS / 2; i++) {
testObj.producer(i);
}
testObj.wait_for_all_work_to_be_done();
// stop pool threads
testObj.stop();
for(auto&& t : ThreadVector) t.join();
}
推荐阅读
- visual-studio-code - 如何更改 VSCode 字体
- android-studio - Android 上的 QApplication,从 JNI 开始,一直到 SEGFAULT
- r - 如何在数据框中选择/过滤/删除仅包含 NA 或 N/A 字符串的列?
- visual-studio-code - 带有扩展的Visual Studio代码环绕找不到环绕.自定义片段
- node.js - 反向代理以使用 virtualmin 在静态 url 上为本地 reactjs 应用程序提供服务
- python - python中selenium库的问题(而不是给程序一个错误,返回None)
- mongodb - 在嵌套数组 MongoDB 中加入文档
- firebase - 任何人都可以帮助我解决在 Flutter Firebase 数据访问中使用 Snapshot 在显示错误的文本中出现的这个错误吗?
- python - 如何通过多重继承实现抽象类?
- regex - 如何从 URL 路径中删除 index.php,除非路径是 /index.php/