首页 > 解决方案 > C++ 类中的简单工作线程

问题描述

假设有一个类包含一些数据并计算一些给定查询的结果,并且查询需要相对大量的时间。

一个示例类(一切都是假的)是:

#include <vector>
#include <numeric>
#include <thread>

struct do_some_work
{
    do_some_work(std::vector<int> data) 
        : _data(std::move(data))
        , _current_query(0)
        , _last_calculated_result(0) 
    {}
    void update_query(size_t x) {
        if (x < _data.size()) {
            _current_query = x;
            recalculate_result();
        }
    }
    int get_result() const {
        return _last_calculated_result;
    }
private:
    void recalculate_result() {
        //dummy some work here     
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        _last_calculated_result = std::accumulate(_data.cbegin(), _data.cbegin() + _current_query, 0);
    }

    std::vector<int> const _data;
    size_t _current_query;
    int _last_calculated_result;
};

这可以在主代码中使用,例如:

#include <algorithm>

int main()
{
    //make some dummy data
    std::vector<int> test_data(20, 0);
    std::iota(test_data.begin(), test_data.end(), 0);

    {
        do_some_work work(test_data);
        for (size_t i = 0; i < test_data.size(); ++i) {
            work.update_query(i);
            std::cout << "result = {" << i << "," <<  work.get_result() << "}" << std::endl;
        }
    }
}

以上会在main函数中等待很多。

现在,假设我们想在一个紧密的循环中运行这个查询(比如 GUI),并且只关心在查询时快速获得“最近”的结果。

所以,我们想把工作转移到一个单独的线程来计算结果,并更新它,当我们得到结果时,我们得到最后一个计算的结果。也就是说,我们希望更改do_some_work类以在线程上完成其工作,而更改最少(基本上找到一种可以应用于(大部分)这种类型的任何类的更改模式)。

我对此的尝试如下:

#include <vector>
#include <numeric>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>

struct do_lots_of_work
{
    do_lots_of_work(std::vector<int> data) 
        : _data(std::move(data))        
        , _current_query(0)
        , _last_calculated_result(0)
        , _worker()
        , _data_mtx()
        , _result_mtx()
        , _cv()
        , _do_exit(false)
        , _work_available(false)
    {
        start_worker();
    }
    void update_query(size_t x) {
        {
            if (x < _data.size()) {
                std::lock_guard<std::mutex> lck(_data_mtx);
                _current_query = x;
                _work_available = true;
                _cv.notify_one();
            }
        }        
    }
    int get_result() const {
        std::lock_guard<std::mutex> lck(_result_mtx);
        return _last_calculated_result;
    }

    ~do_lots_of_work() {
        stop_worker();
    }

private:
    void start_worker() {
        if (!_worker.joinable()) {
            std::cout << "starting worker..." << std::endl;
            _worker = std::thread(&do_lots_of_work::worker_loop, this);
        }
    }

    void stop_worker() {
        std::cout << "worker stopping..." << std::endl;
        if (_worker.joinable()) {
            std::unique_lock<std::mutex> lck(_data_mtx);
            _do_exit = true;
            lck.unlock();
            _cv.notify_one();            
            _worker.join();
        }
        std::cout << "worker stopped" << std::endl;
    }

    void worker_loop() {
        std::cout << "worker started" << std::endl;
        while (true) {
            std::unique_lock<std::mutex> lck(_data_mtx);
            _cv.wait(lck, [this]() {return _work_available || _do_exit; });
            if (_do_exit) { break; }
            if (_work_available) {
                _work_available = false;
                int query = _current_query; //take local copy
                lck.unlock(); //unlock before doing lots of work.
                recalculate_result(query);                
            }
        }
    }

    void recalculate_result(int query) {
        //dummy lots of work here
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int const result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);    
        set_result(result);
    }

    void set_result(int result) {
        std::lock_guard<std::mutex> lck(_result_mtx);
        _last_calculated_result = result;
    }
    
    std::vector<int> const  _data;
    size_t                  _current_query;
    int                     _last_calculated_result;
    
    std::thread             _worker;
    mutable std::mutex      _data_mtx;
    mutable std::mutex      _result_mtx;
    std::condition_variable _cv;
    bool                    _do_exit;
    bool                    _work_available;
};

用法是(示例):

#include <algorithm>

int main()
{
    //make some dummy data
    std::vector<int> test_data(20, 0);
    std::iota(test_data.begin(), test_data.end(), 0);

    {
        do_lots_of_work work(test_data);
        for (size_t i = 0; i < test_data.size(); ++i) {            
            work.update_query(i);
            std::this_thread::sleep_for(std::chrono::milliseconds(500));
            std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
        }
    }
}

这似乎有效,给出了最后一个结果,而不是停止主要功能等。

但是,这看起来需要很多更改才能将工作线程添加到一个简单的类中,例如do_some_work. 像两个mutexes(一个用于工作人员/主要交互数据,一个用于结果)、一个condition_variable、一个more-work-available标志和一个do-exit标志之类的项目,这是相当多的。我想我们不想要async一种机制,因为我们不想每次都潜在地启动一个新线程。

现在,我不确定是否有更简单的模式来进行这种改变,但感觉应该有。一种可用于将工作卸载到线程的模式。

所以最后,我的问题是,可以以比上面的实现更简单的方式do_some_work转换成吗?do_lots_of_work

编辑(解决方案 1)基于 ThreadPool:

使用线程池,可以跳过工作循环,我们需要两个互斥锁,用于结果和查询。锁定更新查询,锁定获取结果,两者都锁定重新计算(获取查询的本地副本,并写入结果)。

注意:另外,在将工作推入队列时,由于我们不关心较旧的结果,我们可以清除工作队列。

示例实现(使用 CTPL 线程池)

#include "CTPL\ctpl_stl.h"
#include <vector>
#include <mutex>

struct do_lots_of_work_with_threadpool
{
    do_lots_of_work_with_threadpool(std::vector<int> data)
        : _data(std::move(data))
        , _current_query(0)
        , _last_calculated_result(0)
        , _pool(1)
        , _result_mtx()
        , _query_mtx()
    {
        
    }
    void update_query(size_t x) {
        if (x < _data.size()) {
            std::lock_guard<std::mutex> lck(_query_mtx);
            _current_query = x;            
        }
        _pool.clear_queue(); //clear as we don't want to calculate any out-date results.
        _pool.push([this](int id) { recalculate_result(); });
    }
    int get_result() const {
        std::lock_guard<std::mutex> lck(_result_mtx);
        return _last_calculated_result;
    }
private:
    void recalculate_result() {                
        //dummy some work here     
        size_t query;
        {
            std::lock_guard<std::mutex> lck(_query_mtx);
            query = _current_query;
        }
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);
        {
            std::lock_guard<std::mutex> lck(_result_mtx);
            _last_calculated_result = result;
        }
    }

    std::vector<int> const _data;
    size_t _current_query;
    int _last_calculated_result;

    ctpl::thread_pool _pool;
    mutable std::mutex _result_mtx;
    mutable std::mutex _query_mtx;
};

使用 ThreadPool 和 Atomic 编辑(解决方案 2):

该解决方案将共享变量更改为原子,因此我们不需要任何互斥锁,也不必考虑获取/释放锁等。这更简单并且非常接近原始类(当然假设线程池类型存在于某处它不是标准的一部分)。

#include "CTPL\ctpl_stl.h"
#include <vector>
#include <mutex>
#include <atomic>

struct do_lots_of_work_with_threadpool_and_atomics
{
    do_lots_of_work_with_threadpool_and_atomics(std::vector<int> data)
        : _data(std::move(data))
        , _current_query(0)
        , _last_calculated_result(0)
        , _pool(1)
    {

    }
    void update_query(size_t x) {
        if (x < _data.size()) {            
            _current_query.store(x);
        }
        _pool.clear_queue(); //clear as we don't want to calculate any out-date results.
        _pool.push([this](int id) { recalculate_result(); });
    }
    int get_result() const {        
        return _last_calculated_result.load();
    }
private:
    void recalculate_result() {
        //dummy some work here             
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        _last_calculated_result.store(std::accumulate(_data.cbegin(), _data.cbegin() + _current_query.load(), 0));                
    }

    std::vector<int> const _data;
    std::atomic<size_t> _current_query;
    std::atomic<int> _last_calculated_result;

    ctpl::thread_pool _pool;
};

标签: c++multithreadingdesign-patternsarchitecturec++17

解决方案


推荐阅读