首页 > 解决方案 > 如何等到池中的所有线程结束工作?

问题描述

我正在尝试使用 boost 库实现简单的线程池。

这是代码:

//boost::asio::io_service ioService;
//boost::thread_group pool;
//boost::asio::io_service::work* worker;

ThreadPool::ThreadPool(int poolSize /*= boost::thread::hardware_concurrency()*/)
{
    if (poolSize >= 1 && poolSize <= boost::thread::hardware_concurrency())
        threadAmount = poolSize;
    else
        threadAmount = 1;

    worker = NULL;
}

ThreadPool::~ThreadPool()
{
    if (worker != NULL && !ioService.stopped())
    {
        _shutdown();
        delete worker;
        worker = NULL;
    }
}

void ThreadPool::start()
{
    if (worker != NULL)
    {
        return;
    }

    worker = new boost::asio::io_service::work(ioService);

    for (int i = 0; i < threadAmount; ++i)
    {
        pool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    }
}

template<class F, class...Args>
void ThreadPool::execute(F f, Args&&... args)
{
    ioService.post(boost::bind(f, std::forward<Args>(args)...));
}

void ThreadPool::shutdown()
{
    pool.interrupt_all();
    _shutdown();
}

void ThreadPool::join_all()
{
// wait for all threads before continue
// in other words - barier for all threads when they finished all jobs
// and to be able re-use them in futur.
}

void ThreadPool::_shutdown()
{
    ioService.reset();
    ioService.stop();
}

在我的程序中,我将一些需要完成的任务分配给线程池,并进一步使用主线程。在某些时候,我需要等待所有线程完成所有任务,然后才能继续计算。有没有办法做到这一点?

非常感谢。

标签: c++multithreadingboost

解决方案


正如其他人指出的那样,罪魁祸首是work实例。

我会大大简化接口(实际上没有理由在析构函数中拆分shutdownshutdown,_shutdownjoin_all一些随机逻辑。这只会让人很难知道责任在哪里。

界面应该是一个成功的坑- 易于使用正确,难以使用错误。

同时,它使正确实现它变得更加容易。

这是第一个刺:

住在科利鲁

#include <boost/asio.hpp>
#include <boost/thread.hpp>

namespace ba = boost::asio;

struct ThreadPool {
    ThreadPool(unsigned poolSize = boost::thread::hardware_concurrency());
    ~ThreadPool();
    void start();

    template <typename F, typename... Args>
    void execute(F f, Args&&... args) {
        ioService.post(std::bind(f, std::forward<Args>(args)...));
    }
  private:
    unsigned threadAmount;
    ba::io_service ioService;
    boost::thread_group pool;
    std::unique_ptr<ba::io_service::work> work;
    void shutdown();
};

ThreadPool::ThreadPool(
    unsigned poolSize /*= boost::thread::hardware_concurrency()*/) {
    threadAmount = std::max(1u, poolSize);
    threadAmount = std::min(boost::thread::hardware_concurrency(), poolSize);
}

ThreadPool::~ThreadPool() {
    shutdown();
}

void ThreadPool::start() {
    if (!work) {
        work = std::make_unique<ba::io_service::work>(ioService);

        for (unsigned i = 0; i < threadAmount; ++i) {
            pool.create_thread(
                boost::bind(&ba::io_service::run, &ioService));
        }
    }
}

void ThreadPool::shutdown() {
    work.reset();

    pool.interrupt_all();
    ioService.stop();

    pool.join_all();

    ioService.reset();
}

#include <iostream>
using namespace std::chrono_literals;

int main() {
    auto now = std::chrono::high_resolution_clock::now;
    auto s = now();
    {
        ThreadPool p(10);
        p.start();

        p.execute([] { std::this_thread::sleep_for(1s); });
        p.execute([] { std::this_thread::sleep_for(600ms); });
        p.execute([] { std::this_thread::sleep_for(400ms); });
        p.execute([] { std::this_thread::sleep_for(200ms); });
        p.execute([] { std::this_thread::sleep_for(10ms); });
    }
    std::cout << "Total elapsed: " << (now() - s) / 1.0s << "s\n";
}

在大多数多核系统上,哪个会打印出类似我的东西:

Total elapsed: 1.00064s

如果超过 hardware_concurrency threadAmount,您在计算要采取的位置时似乎有错误。1poolSize

老实说,为什么要bind在执行中?它真的不会增加很多,你可以把它留给调用者,他们可以选择是否使用绑定,如果是,是boost::bindstd::bind还是其他一些组合可调用对象的方式:

template <typename F>
void execute(F f) { ioService.post(f); }

您缺少围绕io_service::run调用的异常处理(请参阅是否应该捕获 boost::asio::io_service::run() 引发的异常?)。

如果您使用的是最新的 boost 版本,则可以使用较新的io_contextthread_pool接口,大大简化了事情:

住在科利鲁

#include <boost/asio.hpp>

struct ThreadPool {
    ThreadPool(unsigned poolSize)
        : pool(std::clamp(poolSize, 1u, std::thread::hardware_concurrency()))
    { }

    template <typename F>
    void execute(F f) { post(pool, f); }
  private:
    boost::asio::thread_pool pool;
};

这仍然具有 99% 的功能¹,但在 10 LoC 中。

事实上,这个类已经变成了一个简单的包装器,所以我们可以这样写:

住在科利鲁

#include <boost/asio.hpp>
#include <iostream>
using namespace std::chrono_literals;
using C = std::chrono::high_resolution_clock;

static void sleep_for(C::duration d) { std::this_thread::sleep_for(d); }

int main() {
    auto s = C::now();
    {
        boost::asio::thread_pool pool;

        post(pool, [] { sleep_for(1s); });
        post(pool, [] { sleep_for(600ms); });
        // still can bind if you want
        post(pool, std::bind(sleep_for, 400ms));
        post(pool, std::bind(sleep_for, 200ms));
        post(pool, std::bind(sleep_for, 10ms));

        //pool.join(); // implicit in destructor
    }
    std::cout << "Total elapsed: " << (C::now() - s) / 1.0s << "s\n";
}

主要区别在于默认池大小:它是 2*硬件并发(但也更安全地计算,因为并非所有平台都有可靠的 hardware_concurrency() - 它可能是零,例如)。


¹它目前不行使中断点


推荐阅读