首页 > 解决方案 > 如何等待此线程池中的所有任务完成?

问题描述

我正在尝试写一ThreadPool堂课

class ThreadPool {
  public:
    ThreadPool(size_t numberOfThreads):isAlive(true) {
      for(int i =0; i < numberOfThreads; i++) {
        workerThreads.push_back(std::thread(&ThreadPool::doJob, this));
      }

      #ifdef DEBUG 
      std::cout<<"Construction Complete"<<std::endl; 
      #endif
    }

    ~ThreadPool() {
      #ifdef DEBUG 
      std::cout<<"Destruction Start"<<std::endl; 
      #endif

      isAlive = false;
      conditionVariable.notify_all();
      waitForExecution();

      #ifdef DEBUG 
      std::cout<<"Destruction Complete"<<std::endl; 
      #endif
    }

    void waitForExecution() {
      for(std::thread& worker: workerThreads) {
        worker.join();
      }
    }

    void addWork(std::function<void()> job) {
      #ifdef DEBUG 
      std::cout<<"Adding work"<<std::endl; 
      #endif
      std::unique_lock<std::mutex> lock(lockListMutex);
      jobQueue.push_back(job);
      conditionVariable.notify_one();
    }

  private:
    // performs actual work
    void doJob() {
      // try {
        while(isAlive) {
          #ifdef DEBUG 
          std::cout<<"Do Job"<<std::endl; 
          #endif

          std::unique_lock<std::mutex> lock(lockListMutex);
          if(!jobQueue.empty()) {
            #ifdef DEBUG 
            std::cout<<"Next Job Found"<<std::endl; 
            #endif

            std::function<void()> job = jobQueue.front();
            jobQueue.pop_front();
            job();
          }
          conditionVariable.wait(lock);
        }
    }

    // a vector containing worker threads
    std::vector<std::thread> workerThreads;

    // a queue for jobs
    std::list<std::function<void()>> jobQueue;

    // a mutex for synchronized insertion and deletion from list
    std::mutex lockListMutex;

    std::atomic<bool> isAlive;

    // condition variable to track whether or not there is a job in queue
    std::condition_variable conditionVariable;
};

我正在从我的主线程向这个线程池添加工作。我的问题是调用waitForExecution()结果永远等待主线程。我需要能够在所有工作完成后终止线程并从那里继续执行主线程。我应该如何在这里进行?

标签: c++multithreadingc++11c++14threadpool

解决方案


编写健壮线程池的第一步是将队列从线程管理中分离出来。线程安全队列很难自己编写,并且类似地管理线程。

线程安全队列如下所示:

template<class T>
struct threadsafe_queue {
  boost::optional<T> pop() {
    std::unique_lock<std::mutex> l(m);
    cv.wait(l, [&]{ aborted || !data.empty(); } );
    if (aborted) return {};
    return data.pop_front();
  }

  void push( T t )
  {
    std::unique_lock<std::mutex> l(m);
    if (aborted) return;
    data.push_front( std::move(t) );
    cv.notify_one();
  }

  void abort()
  {
    std::unique_lock<std::mutex> l(m);
    aborted = true;
    data = {};
    cv.notify_all();
  }
  ~threadsafe_queue() { abort(); }
private:
  std::mutex m;
  std::condition_variable cv;
  std::queue< T > data;
  bool aborted = false;
};

where队列中止时pop返回。nullopt

现在我们的线程池很简单:

struct threadpool {
  explicit threadpool(std::size_t n) { add_threads(n); }
  threadpool() = default;
  ~threadpool(){ abort(); }

  void add_thread() { add_threads(1); }
  void add_threads(std::size_t n)
  {
    for (std::size_t i = 0; i < n; ++i)
      threads.push_back( std::thread( [this]{ do_thread_work(); } ) );
  }
  template<class F>
  auto add_task( F && f )
  {
    using R = std::result_of_t< F&() >;
    auto pptr = std::make_shared<std::promise<R>>();
    auto future = pptr.get_future();
    tasks.push([pptr]{ (*pptr)(); });
    return future;
  }
  void abort()
  {
    tasks.abort();
    while (!threads.empty()) {
      threads.back().join();
      threads.pop_back();
    }
  }
private:
  threadsafe_queue< std::function<void()> > tasks;
  std::vector< std::thread > threads;
  void do_thread_work() {
    while (auto f = tasks.pop()) {
      (*f)();
    }
  }
};

请注意,如果您中止,未完成的未来将充满违反承诺的异常。

工作线程在它们提供的队列被中止时停止运行。主线程abort()将等待工作线程完成(明智的做法)。

这确实意味着工作线程任务也必须终止,否则主线程将挂起。没有办法避免这种情况;通常,您的工作线程的任务需要合作才能获得一条消息,说明它们应该提前中止。

Boost 有一个线程池,它与它的线程原语集成并允许较少合作的中止;在其中,所有互斥锁类型的操作都会隐式检查中止标志,如果他们看到它,操作就会抛出。


推荐阅读