c++ - 使用线程池进行递归函数探索,限制队列大小
问题描述
我已经为线程池修改了库 ctpl_stl.h。根据我的问题,我必须使用一个调用自身两次的递归函数,所以我分别从第一次和第二次调用中得到 C1 和 C2。然后在到达函数结束之前,可以比较 C1 和 C2。
这是一个树探索,C1代表左手,C2代表右手。我想设置有限数量的线程,基本上取决于我机器的能力。对于这个测试,我只在池中创建了两个线程。
每次我在使用 pool.push(function, ... arguments...) 时推送一个函数时,我都会将此函数(任务)排入队列中,该队列将在这些线程完成工作后分配给任何可用线程,所以由于左右手的探索,这个队列最多可以增加一半的树的叶子。
在调试我的代码时,我发现在某些时候已经分配给线程 0 和 1 的任务 0 和 1(不是分别因为它们的工作方式)将等待任务的结果,即。2 和 3,但不会执行此任务,因为它们正在等待任何可用线程。所以它基本上陷入了僵局。
所以我认为我通过限制队列大小解决了死锁,那么如果目前没有可用的线程,则不会有任务排队。
然后我修改了线程池库的一部分以限制队列大小,通过一个简单的条件验证它的大小总是在排队之前,如果这个大小小于池中创建的线程,那么这意味着有可用的线程来分配一个新的任务。
这个主意听起来不错,但它不起作用,我很确定它在某个地方陷入僵局。我不知道我的排队限制是否有效(可能没有)。
但是这个库肯定可以工作,因为在创建了相当于一棵树 50% 叶子的线程之后,一切正常。
问题是叶子的数量理论上是由 2^n 决定的,其中 n 是我的样本的大小(代码中输入向量的大小)。所以创建数百万个线程不是一个实际的选择。
对库的所有修改都由 //Modification by me 注释
这是一个研究生课程的项目,有什么想法可能是错的吗?先感谢您
主文件
#include <ctime>
#include <iostream>
#include <vector>
#include <chrono>
#include <map>
#include <algorithm>
#include "ctpl_stl.h"
#include <string>
ctpl::thread_pool pool(2);
int leaves = 0;
int nodes = 0;
std::vector<int> function(int id, std::vector<int>& A, std::vector<int>& B, int left, int right) {
int tmp;
if (A.size() == 0) return B;
//left and right are only values to check while debugging which hand i am exploring, not relevant in the code
std::vector<int> A1 = A;
std::vector<int> A2 = A;
std::vector<int> B1 = B;
std::vector<int> B2 = B;
std::vector<int> C1;
std::vector<int> C2;
bool tempo = pool.check();
std::future<std::vector<int>> C1_tmp;
//left hand exploring
tmp = A1.back();
A1.pop_back();
B1.push_back(tmp + 1);
//Inside the pool, if Queue.size() is smaller than Threads {ie. Pool(2)}
//Then it means there is an available thread, so we add the function to the Queue to be run by this
//available thread. If not, it C1_tmp will return a false and function for C1, will be
//called by current thread.
C1_tmp= pool.push(function, ref(A1), ref(B1),1,0);
if (!C1_tmp.valid()) {
C1 = function(id, A1, B1, 1, 0);
}
//righ hand exploring
tmp = A2.back();
A2.pop_back();
B2.push_back(tmp + 1);
C2 = function(id, A2, B2,0,1);
//Get value from future so the running current thread could explore righ-hand
//while any available thread explore left-hand
if (C1_tmp.valid()) {
C1 = C1_tmp.get();
}
/*Join right Thread with left Thread*/
//There will be some comparison between C1 and C2
leaves++;
return C1; /*or C2, does not matter yet*/
}
int main()
{
std::vector<int> A, B, C;
int val = 10;
for (int i = 0; i < 4; i++)
{
A.push_back(i);
}
clock_t begin = clock();
int tempo = pool.n_idle();
printf("Number of available threads: %i \n", tempo);
C = function(0, A, B, 0, 0);
clock_t end = clock();
double elapsed_secs = double(end - begin) / CLOCKS_PER_SEC;
printf("Number of leaves: %i \n", leaves);
printf("Elapsed time = %.2lf \n", elapsed_secs);
return 0;
}
ctpl_stl.h
/*********************************************************
*
* Copyright (C) 2014 by Vitaliy Vitsentiy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*********************************************************/
#ifndef __ctpl_stl_thread_pool_H__
#define __ctpl_stl_thread_pool_H__
#include <functional>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <exception>
#include <future>
#include <mutex>
#include <queue>
// thread pool to run user's functors with signature
// ret func(int id, other_params)
// where id is the index of the thread that runs the functor
// ret is some return type
namespace ctpl {
namespace detail {
template <typename T>
class Queue {
public:
bool push(T const& value) {
std::unique_lock<std::mutex> lock(this->mutex);
this->q.push(value);
return true;
}
// deletes the retrieved element, do not use for non integral types
bool pop(T& v) {
std::unique_lock<std::mutex> lock(this->mutex);
if (this->q.empty())
return false;
v = this->q.front();
this->q.pop();
return true;
}
bool empty() {
std::unique_lock<std::mutex> lock(this->mutex);
return this->q.empty();
}
int size() { //modification by me
return q.size();
}
private:
std::queue<T> q;
std::mutex mutex;
};
}
class thread_pool {
public:
thread_pool() { this->init(); }
thread_pool(int nThreads) { this->init(); this->resize(nThreads); }
// the destructor waits for all the functions in the queue to be finished
~thread_pool() {
this->stop(true);
}
//modification by me
bool check() { //Check if queue is smaller than pre-set threads
std::size_t val = q.size();
if (val < threads.size()) {
std::unique_lock<std::mutex> lock(this->mutex);
return true;
}
return false;
}
// get the number of running threads in the pool
int size() { return static_cast<int>(this->threads.size()); }
// number of idle threads
int n_idle() { return this->nWaiting; }
std::thread& get_thread(int i) { return *this->threads[i]; }
// change the number of threads in the pool
// should be called from one thread, otherwise be careful to not interleave, also with this->stop()
// nThreads must be >= 0
void resize(int nThreads) {
if (!this->isStop && !this->isDone) {
int oldNThreads = static_cast<int>(this->threads.size());
if (oldNThreads <= nThreads) { // if the number of threads is increased
this->threads.resize(nThreads);
this->flags.resize(nThreads);
for (int i = oldNThreads; i < nThreads; ++i) {
this->flags[i] = std::make_shared<std::atomic<bool>>(false);
this->set_thread(i);
}
}
else { // the number of threads is decreased
for (int i = oldNThreads - 1; i >= nThreads; --i) {
*this->flags[i] = true; // this thread will finish
this->threads[i]->detach();
}
{
// stop the detached threads that were waiting
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all();
}
this->threads.resize(nThreads); // safe to delete because the threads are detached
this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals
}
}
}
// empty the queue
void clear_queue() {
std::function<void(int id)>* _f;
while (this->q.pop(_f))
delete _f; // empty the queue
}
// pops a functional wrapper to the original function
std::function<void(int)> pop() {
std::function<void(int id)>* _f = nullptr;
this->q.pop(_f);
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
std::function<void(int)> f;
if (_f)
f = *_f;
return f;
}
// wait for all computing threads to finish and stop all threads
// may be called asynchronously to not pause the calling thread while waiting
// if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
void stop(bool isWait = false) {
if (!isWait) {
if (this->isStop)
return;
this->isStop = true;
for (int i = 0, n = this->size(); i < n; ++i) {
*this->flags[i] = true; // command the threads to stop
}
this->clear_queue(); // empty the queue
}
else {
if (this->isDone || this->isStop)
return;
this->isDone = true; // give the waiting threads a command to finish
}
{
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_all(); // stop all waiting threads
}
for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish
if (this->threads[i]->joinable())
this->threads[i]->join();
}
// if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
// therefore delete them here
this->clear_queue();
this->threads.clear();
this->flags.clear();
}
template<typename F, typename... Rest>
auto push(F&& f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
//Modification by me--------------------RELEVANT CHANGE
if (!check()) { //mine, function returns false value if Queue.size() < Number of threads
std::promise<std::vector<int>> prom;
prom.~promise();
return prom.get_future();
}
//-----------------------------------------------
auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
);
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
// run the user's function that excepts argument int - id of the running thread. returned value is templatized
// operator returns std::future, where the user can get the result and rethrow the catched exceptins
template<typename F>
auto push(F&& f) ->std::future<decltype(f(0))> {
auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
auto _f = new std::function<void(int id)>([pck](int id) {
(*pck)(id);
});
this->q.push(_f);
std::unique_lock<std::mutex> lock(this->mutex);
this->cv.notify_one();
return pck->get_future();
}
private:
// deleted
thread_pool(const thread_pool&);// = delete;
thread_pool(thread_pool&&);// = delete;
thread_pool& operator=(const thread_pool&);// = delete;
thread_pool& operator=(thread_pool&&);// = delete;
void set_thread(int i) {
std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
std::atomic<bool>& _flag = *flag;
std::function<void(int id)>* _f;
bool isPop = this->q.pop(_f);
while (true) {
while (isPop) { // if there is anything in the queue
std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
(*_f)(i);
if (_flag)
return; // the thread is wanted to stop, return even if the queue is not empty yet
else
isPop = this->q.pop(_f);
}
// the queue is empty here, wait for the next command
std::unique_lock<std::mutex> lock(this->mutex);
++this->nWaiting;
this->cv.wait(lock, [this, &_f, &isPop, &_flag]() { isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
--this->nWaiting;
if (!isPop)
return; // if the queue is empty and this->isDone == true or *flag then return
}
};
this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
}
void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }
std::vector<std::unique_ptr<std::thread>> threads;
std::vector<std::shared_ptr<std::atomic<bool>>> flags;
detail::Queue<std::function<void(int id)>*> q;
std::atomic<bool> isDone;
std::atomic<bool> isStop;
std::atomic<int> nWaiting; // how many threads are waiting
std::mutex mutex;
std::condition_variable cv;
//Modification by me
int numThreads() {
return threads.size();
}
};
}
#endif // __ctpl_stl_thread_pool_H__
解决方案
推荐阅读
- postgresql - 如何将postgres的列日期更新为提前几天
- javascript - 当我尝试它时颜色不会改变 ti 改变
- arrays - 数组天气API
- ruby - | 之间有什么规则吗?x | 在块函数中?
- excel - 无法以 SYSTEM 身份运行 Excel COM 对象
- java - 如何在我的 android 应用程序中访问我的 Spring Boot Websocket (Stomp)?
- css - 水平滚动时如何实现SVG圆的绘制
- react-native - 反应原生 Firebase 存储在发布版本中无法正常工作
- python - Pyspark 从外部文件导入字典
- c# - 调用的表单不是从表单代码的开头开始