首页 > 解决方案 > 使用线程池进行递归函数探索,限制队列大小

问题描述

我已经为线程池修改了库 ctpl_stl.h。根据我的问题,我必须使用一个调用自身两次的递归函数,所以我分别从第一次和第二次调用中得到 C1 和 C2。然后在到达函数结束之前,可以比较 C1 和 C2。

这是一个树探索,C1代表左手,C2代表右手。我想设置有限数量的线程,基本上取决于我机器的能力。对于这个测试,我只在池中创建了两个线程。

每次我在使用 pool.push(function, ... arguments...) 时推送一个函数时,我都会将此函数(任务)排入队列中,该队列将在这些线程完成工作后分配给任何可用线程,所以由于左右手的探索,这个队列最多可以增加一半的树的叶子。

在调试我的代码时,我发现在某些时候已经分配给线程 0 和 1 的任务 0 和 1(不是分别因为它们的工作方式)将等待任务的结果,即。2 和 3,但不会执行此任务,因为它们正在等待任何可用线程。所以它基本上陷入了僵局。

所以我认为我通过限制队列大小解决了死锁,那么如果目前没有可用的线程,则不会有任务排队。

然后我修改了线程池库的一部分以限制队列大小,通过一个简单的条件验证它的大小总是在排队之前,如果这个大小小于池中创建的线程,那么这意味着有可用的线程来分配一个新的任务。

这个主意听起来不错,但它不起作用,我很确定它在某个地方陷入僵局。我不知道我的排队限制是否有效(可能没有)。

但是这个库肯定可以工作,因为在创建了相当于一棵树 50% 叶子的线程之后,一切正常。

问题是叶子的数量理论上是由 2^n 决定的,其中 n 是我的样本的大小(代码中输入向量的大小)。所以创建数百万个线程不是一个实际的选择。

对库的所有修改都由 //Modification by me 注释

这是一个研究生课程的项目,有什么想法可能是错的吗?先感谢您

主文件

#include <ctime>
#include <iostream>
#include <vector>
#include <chrono>
#include <map>
#include <algorithm>
#include "ctpl_stl.h"
#include <string>

ctpl::thread_pool pool(2);

int leaves = 0;
int nodes = 0;

std::vector<int> function(int id, std::vector<int>& A, std::vector<int>& B, int left, int right) {
    int tmp;
    if (A.size() == 0) return B;
    //left and right are only values to check while debugging which hand i am exploring, not relevant in the code

    std::vector<int> A1 = A;
    std::vector<int> A2 = A;
    std::vector<int> B1 = B;
    std::vector<int> B2 = B;
    std::vector<int> C1;
    std::vector<int> C2;

    bool tempo = pool.check();
    std::future<std::vector<int>> C1_tmp;

    //left hand exploring
    tmp = A1.back();
    A1.pop_back();
    B1.push_back(tmp + 1);
    //Inside the pool, if Queue.size() is smaller than Threads {ie. Pool(2)}
    //Then it means there is an available thread, so we add the function to the Queue to be run by this
    //available thread. If not, it C1_tmp will return a false and function for C1, will be
    //called by current thread.
    C1_tmp= pool.push(function, ref(A1), ref(B1),1,0);
    if (!C1_tmp.valid()) {
        C1 = function(id, A1, B1, 1, 0);
    }


    //righ hand exploring
    tmp = A2.back();
    A2.pop_back();
    B2.push_back(tmp + 1);
    C2 = function(id, A2, B2,0,1);
    //Get value from future so the running current thread could explore righ-hand
    //while any available thread explore left-hand
    if (C1_tmp.valid()) {
        C1 = C1_tmp.get();  
    }

    /*Join right Thread with left Thread*/

    //There will be some comparison between C1 and C2

    leaves++;
    return C1; /*or C2, does not matter yet*/
}

int main()
{
    std::vector<int> A, B, C;
    int val = 10;

    for (int i = 0; i < 4; i++)
    {
        A.push_back(i);
    }

    clock_t begin = clock();
    int tempo = pool.n_idle();
    printf("Number of available threads: %i \n", tempo);
    C = function(0, A, B, 0, 0);
    clock_t end = clock();
    double elapsed_secs = double(end - begin) / CLOCKS_PER_SEC;

    printf("Number of leaves: %i \n", leaves);
    printf("Elapsed time = %.2lf \n", elapsed_secs);

    return 0;
}

ctpl_stl.h

/*********************************************************
*
*  Copyright (C) 2014 by Vitaliy Vitsentiy
*
*  Licensed under the Apache License, Version 2.0 (the "License");
*  you may not use this file except in compliance with the License.
*  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*
*********************************************************/


#ifndef __ctpl_stl_thread_pool_H__
#define __ctpl_stl_thread_pool_H__

#include <functional>
#include <thread>
#include <atomic>
#include <vector>
#include <memory>
#include <exception>
#include <future>
#include <mutex>
#include <queue>



// thread pool to run user's functors with signature
//      ret func(int id, other_params)
// where id is the index of the thread that runs the functor
// ret is some return type


namespace ctpl {

    namespace detail {
        template <typename T>
        class Queue {
        public:
            bool push(T const& value) {
                std::unique_lock<std::mutex> lock(this->mutex);
                this->q.push(value);
                return true;
            }
            // deletes the retrieved element, do not use for non integral types
            bool pop(T& v) {
                std::unique_lock<std::mutex> lock(this->mutex);
                if (this->q.empty())
                    return false;
                v = this->q.front();
                this->q.pop();
                return true;
            }
            bool empty() {
                std::unique_lock<std::mutex> lock(this->mutex);
                return this->q.empty();
            }
            int size() {    //modification by me
                return q.size();
            }

        private:
            std::queue<T> q;
            std::mutex mutex;
        };
    }

    class thread_pool {

    public:

        thread_pool() { this->init(); }
        thread_pool(int nThreads) { this->init(); this->resize(nThreads); }

        // the destructor waits for all the functions in the queue to be finished
        ~thread_pool() {
            this->stop(true);
        }
        //modification by me
        bool check() {  //Check if queue is smaller than pre-set threads
            std::size_t val = q.size();
            if (val < threads.size()) {
                std::unique_lock<std::mutex> lock(this->mutex);
                return true;
            }
            return false;
        }

        // get the number of running threads in the pool
        int size() { return static_cast<int>(this->threads.size()); }

        // number of idle threads
        int n_idle() { return this->nWaiting; }
        std::thread& get_thread(int i) { return *this->threads[i]; }

        // change the number of threads in the pool
        // should be called from one thread, otherwise be careful to not interleave, also with this->stop()
        // nThreads must be >= 0
        void resize(int nThreads) {
            if (!this->isStop && !this->isDone) {
                int oldNThreads = static_cast<int>(this->threads.size());
                if (oldNThreads <= nThreads) {  // if the number of threads is increased
                    this->threads.resize(nThreads);
                    this->flags.resize(nThreads);

                    for (int i = oldNThreads; i < nThreads; ++i) {
                        this->flags[i] = std::make_shared<std::atomic<bool>>(false);
                        this->set_thread(i);
                    }
                }
                else {  // the number of threads is decreased
                    for (int i = oldNThreads - 1; i >= nThreads; --i) {
                        *this->flags[i] = true;  // this thread will finish
                        this->threads[i]->detach();
                    }
                    {
                        // stop the detached threads that were waiting
                        std::unique_lock<std::mutex> lock(this->mutex);
                        this->cv.notify_all();
                    }
                    this->threads.resize(nThreads);  // safe to delete because the threads are detached
                    this->flags.resize(nThreads);  // safe to delete because the threads have copies of shared_ptr of the flags, not originals
                }
            }
        }

        // empty the queue
        void clear_queue() {
            std::function<void(int id)>* _f;
            while (this->q.pop(_f))
                delete _f; // empty the queue
        }

        // pops a functional wrapper to the original function
        std::function<void(int)> pop() {
            std::function<void(int id)>* _f = nullptr;
            this->q.pop(_f);
            std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
            std::function<void(int)> f;
            if (_f)
                f = *_f;
            return f;
        }

        // wait for all computing threads to finish and stop all threads
        // may be called asynchronously to not pause the calling thread while waiting
        // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions
        void stop(bool isWait = false) {
            if (!isWait) {
                if (this->isStop)
                    return;
                this->isStop = true;
                for (int i = 0, n = this->size(); i < n; ++i) {
                    *this->flags[i] = true;  // command the threads to stop
                }
                this->clear_queue();  // empty the queue
            }
            else {
                if (this->isDone || this->isStop)
                    return;
                this->isDone = true;  // give the waiting threads a command to finish
            }
            {
                std::unique_lock<std::mutex> lock(this->mutex);
                this->cv.notify_all();  // stop all waiting threads
            }
            for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) {  // wait for the computing threads to finish
                if (this->threads[i]->joinable())
                    this->threads[i]->join();
            }
            // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads
            // therefore delete them here
            this->clear_queue();
            this->threads.clear();
            this->flags.clear();
        }

        template<typename F, typename... Rest>
        auto push(F&& f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> {
            //Modification by me--------------------RELEVANT CHANGE
            if (!check()) { //mine, function returns false value if Queue.size() < Number of threads
                std::promise<std::vector<int>> prom;
                prom.~promise();
                return prom.get_future();
            }
            //-----------------------------------------------
            auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>(
                std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...)
                );



            auto _f = new std::function<void(int id)>([pck](int id) {
                (*pck)(id);
                });

            this->q.push(_f);
            std::unique_lock<std::mutex> lock(this->mutex);
            this->cv.notify_one();


            return pck->get_future();
        }

        // run the user's function that excepts argument int - id of the running thread. returned value is templatized
        // operator returns std::future, where the user can get the result and rethrow the catched exceptins
        template<typename F>
        auto push(F&& f) ->std::future<decltype(f(0))> {
            auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f));
            auto _f = new std::function<void(int id)>([pck](int id) {
                (*pck)(id);
                });
            this->q.push(_f);
            std::unique_lock<std::mutex> lock(this->mutex);
            this->cv.notify_one();
            return pck->get_future();
        }


    private:

        // deleted
        thread_pool(const thread_pool&);// = delete;
        thread_pool(thread_pool&&);// = delete;
        thread_pool& operator=(const thread_pool&);// = delete;
        thread_pool& operator=(thread_pool&&);// = delete;

        void set_thread(int i) {
            std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag
            auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() {
                std::atomic<bool>& _flag = *flag;
                std::function<void(int id)>* _f;
                bool isPop = this->q.pop(_f);
                while (true) {
                    while (isPop) {  // if there is anything in the queue
                        std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred
                        (*_f)(i);
                        if (_flag)
                            return;  // the thread is wanted to stop, return even if the queue is not empty yet
                        else
                            isPop = this->q.pop(_f);
                    }
                    // the queue is empty here, wait for the next command
                    std::unique_lock<std::mutex> lock(this->mutex);
                    ++this->nWaiting;
                    this->cv.wait(lock, [this, &_f, &isPop, &_flag]() { isPop = this->q.pop(_f); return isPop || this->isDone || _flag; });
                    --this->nWaiting;
                    if (!isPop)
                        return;  // if the queue is empty and this->isDone == true or *flag then return
                }
            };
            this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique()
        }

        void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; }

        std::vector<std::unique_ptr<std::thread>> threads;
        std::vector<std::shared_ptr<std::atomic<bool>>> flags;
        detail::Queue<std::function<void(int id)>*> q;
        std::atomic<bool> isDone;
        std::atomic<bool> isStop;
        std::atomic<int> nWaiting;  // how many threads are waiting

        std::mutex mutex;
        std::condition_variable cv;

        //Modification by me
        int numThreads() {
            return threads.size();
        }
    };

}

#endif // __ctpl_stl_thread_pool_H__

标签: c++multithreadingrecursionqueuethreadpool

解决方案


推荐阅读