c++ - 如何等待此线程池中的所有任务完成?
问题描述
我正在尝试写一ThreadPool
堂课
class ThreadPool {
public:
ThreadPool(size_t numberOfThreads):isAlive(true) {
for(int i =0; i < numberOfThreads; i++) {
workerThreads.push_back(std::thread(&ThreadPool::doJob, this));
}
#ifdef DEBUG
std::cout<<"Construction Complete"<<std::endl;
#endif
}
~ThreadPool() {
#ifdef DEBUG
std::cout<<"Destruction Start"<<std::endl;
#endif
isAlive = false;
conditionVariable.notify_all();
waitForExecution();
#ifdef DEBUG
std::cout<<"Destruction Complete"<<std::endl;
#endif
}
void waitForExecution() {
for(std::thread& worker: workerThreads) {
worker.join();
}
}
void addWork(std::function<void()> job) {
#ifdef DEBUG
std::cout<<"Adding work"<<std::endl;
#endif
std::unique_lock<std::mutex> lock(lockListMutex);
jobQueue.push_back(job);
conditionVariable.notify_one();
}
private:
// performs actual work
void doJob() {
// try {
while(isAlive) {
#ifdef DEBUG
std::cout<<"Do Job"<<std::endl;
#endif
std::unique_lock<std::mutex> lock(lockListMutex);
if(!jobQueue.empty()) {
#ifdef DEBUG
std::cout<<"Next Job Found"<<std::endl;
#endif
std::function<void()> job = jobQueue.front();
jobQueue.pop_front();
job();
}
conditionVariable.wait(lock);
}
}
// a vector containing worker threads
std::vector<std::thread> workerThreads;
// a queue for jobs
std::list<std::function<void()>> jobQueue;
// a mutex for synchronized insertion and deletion from list
std::mutex lockListMutex;
std::atomic<bool> isAlive;
// condition variable to track whether or not there is a job in queue
std::condition_variable conditionVariable;
};
我正在从我的主线程向这个线程池添加工作。我的问题是调用waitForExecution()
结果永远等待主线程。我需要能够在所有工作完成后终止线程并从那里继续执行主线程。我应该如何在这里进行?
解决方案
编写健壮线程池的第一步是将队列从线程管理中分离出来。线程安全队列很难自己编写,并且类似地管理线程。
线程安全队列如下所示:
template<class T>
struct threadsafe_queue {
boost::optional<T> pop() {
std::unique_lock<std::mutex> l(m);
cv.wait(l, [&]{ aborted || !data.empty(); } );
if (aborted) return {};
return data.pop_front();
}
void push( T t )
{
std::unique_lock<std::mutex> l(m);
if (aborted) return;
data.push_front( std::move(t) );
cv.notify_one();
}
void abort()
{
std::unique_lock<std::mutex> l(m);
aborted = true;
data = {};
cv.notify_all();
}
~threadsafe_queue() { abort(); }
private:
std::mutex m;
std::condition_variable cv;
std::queue< T > data;
bool aborted = false;
};
where队列中止时pop
返回。nullopt
现在我们的线程池很简单:
struct threadpool {
explicit threadpool(std::size_t n) { add_threads(n); }
threadpool() = default;
~threadpool(){ abort(); }
void add_thread() { add_threads(1); }
void add_threads(std::size_t n)
{
for (std::size_t i = 0; i < n; ++i)
threads.push_back( std::thread( [this]{ do_thread_work(); } ) );
}
template<class F>
auto add_task( F && f )
{
using R = std::result_of_t< F&() >;
auto pptr = std::make_shared<std::promise<R>>();
auto future = pptr.get_future();
tasks.push([pptr]{ (*pptr)(); });
return future;
}
void abort()
{
tasks.abort();
while (!threads.empty()) {
threads.back().join();
threads.pop_back();
}
}
private:
threadsafe_queue< std::function<void()> > tasks;
std::vector< std::thread > threads;
void do_thread_work() {
while (auto f = tasks.pop()) {
(*f)();
}
}
};
请注意,如果您中止,未完成的未来将充满违反承诺的异常。
工作线程在它们提供的队列被中止时停止运行。主线程abort()
将等待工作线程完成(明智的做法)。
这确实意味着工作线程任务也必须终止,否则主线程将挂起。没有办法避免这种情况;通常,您的工作线程的任务需要合作才能获得一条消息,说明它们应该提前中止。
Boost 有一个线程池,它与它的线程原语集成并允许较少合作的中止;在其中,所有互斥锁类型的操作都会隐式检查中止标志,如果他们看到它,操作就会抛出。
推荐阅读
- groovy - 如果存在匹配值,如何循环遍历地图并仅显示一行
- python - 为什么时间没有打印实际时间?
- android - 如何在 Android Studio (LogCat) 中删除 chromium 日志
- python - 尽管代码工作正常,但状态栏上的语法无效
- django - 我在 Visual Studio 代码工作区设置中遇到 pylint 问题
- c++ - 尝试在 VS 2019 中使用 regsvr32 注册的 DLL
- python - 有没有办法使用 MLflow 记录数据集的描述性统计信息?
- java - 连接到多台主机时出现 EclipseLink 异常“连接是只读的”
- html - 使用 br 和 hr 标签后,段落标签属性不起作用
- python - 将具有等于的字典转换为 json 字典