c++ - C++ 类中的简单工作线程
问题描述
假设有一个类包含一些数据并计算一些给定查询的结果,并且查询需要相对大量的时间。
一个示例类(一切都是假的)是:
#include <vector>
#include <numeric>
#include <thread>
struct do_some_work
{
do_some_work(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
{}
void update_query(size_t x) {
if (x < _data.size()) {
_current_query = x;
recalculate_result();
}
}
int get_result() const {
return _last_calculated_result;
}
private:
void recalculate_result() {
//dummy some work here
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
_last_calculated_result = std::accumulate(_data.cbegin(), _data.cbegin() + _current_query, 0);
}
std::vector<int> const _data;
size_t _current_query;
int _last_calculated_result;
};
这可以在主代码中使用,例如:
#include <algorithm>
int main()
{
//make some dummy data
std::vector<int> test_data(20, 0);
std::iota(test_data.begin(), test_data.end(), 0);
{
do_some_work work(test_data);
for (size_t i = 0; i < test_data.size(); ++i) {
work.update_query(i);
std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
}
}
}
以上会在main函数中等待很多。
现在,假设我们想在一个紧密的循环中运行这个查询(比如 GUI),并且只关心在查询时快速获得“最近”的结果。
所以,我们想把工作转移到一个单独的线程来计算结果,并更新它,当我们得到结果时,我们得到最后一个计算的结果。也就是说,我们希望更改do_some_work
类以在线程上完成其工作,而更改最少(基本上找到一种可以应用于(大部分)这种类型的任何类的更改模式)。
我对此的尝试如下:
#include <vector>
#include <numeric>
#include <mutex>
#include <thread>
#include <condition_variable>
#include <iostream>
struct do_lots_of_work
{
do_lots_of_work(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
, _worker()
, _data_mtx()
, _result_mtx()
, _cv()
, _do_exit(false)
, _work_available(false)
{
start_worker();
}
void update_query(size_t x) {
{
if (x < _data.size()) {
std::lock_guard<std::mutex> lck(_data_mtx);
_current_query = x;
_work_available = true;
_cv.notify_one();
}
}
}
int get_result() const {
std::lock_guard<std::mutex> lck(_result_mtx);
return _last_calculated_result;
}
~do_lots_of_work() {
stop_worker();
}
private:
void start_worker() {
if (!_worker.joinable()) {
std::cout << "starting worker..." << std::endl;
_worker = std::thread(&do_lots_of_work::worker_loop, this);
}
}
void stop_worker() {
std::cout << "worker stopping..." << std::endl;
if (_worker.joinable()) {
std::unique_lock<std::mutex> lck(_data_mtx);
_do_exit = true;
lck.unlock();
_cv.notify_one();
_worker.join();
}
std::cout << "worker stopped" << std::endl;
}
void worker_loop() {
std::cout << "worker started" << std::endl;
while (true) {
std::unique_lock<std::mutex> lck(_data_mtx);
_cv.wait(lck, [this]() {return _work_available || _do_exit; });
if (_do_exit) { break; }
if (_work_available) {
_work_available = false;
int query = _current_query; //take local copy
lck.unlock(); //unlock before doing lots of work.
recalculate_result(query);
}
}
}
void recalculate_result(int query) {
//dummy lots of work here
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int const result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);
set_result(result);
}
void set_result(int result) {
std::lock_guard<std::mutex> lck(_result_mtx);
_last_calculated_result = result;
}
std::vector<int> const _data;
size_t _current_query;
int _last_calculated_result;
std::thread _worker;
mutable std::mutex _data_mtx;
mutable std::mutex _result_mtx;
std::condition_variable _cv;
bool _do_exit;
bool _work_available;
};
用法是(示例):
#include <algorithm>
int main()
{
//make some dummy data
std::vector<int> test_data(20, 0);
std::iota(test_data.begin(), test_data.end(), 0);
{
do_lots_of_work work(test_data);
for (size_t i = 0; i < test_data.size(); ++i) {
work.update_query(i);
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "result = {" << i << "," << work.get_result() << "}" << std::endl;
}
}
}
这似乎有效,给出了最后一个结果,而不是停止主要功能等。
但是,这看起来需要很多更改才能将工作线程添加到一个简单的类中,例如do_some_work
. 像两个mutexes
(一个用于工作人员/主要交互数据,一个用于结果)、一个condition_variable
、一个more-work-available
标志和一个do-exit
标志之类的项目,这是相当多的。我想我们不想要async
一种机制,因为我们不想每次都潜在地启动一个新线程。
现在,我不确定是否有更简单的模式来进行这种改变,但感觉应该有。一种可用于将工作卸载到线程的模式。
所以最后,我的问题是,可以以比上面的实现更简单的方式do_some_work
转换成吗?do_lots_of_work
编辑(解决方案 1)基于 ThreadPool:
使用线程池,可以跳过工作循环,我们需要两个互斥锁,用于结果和查询。锁定更新查询,锁定获取结果,两者都锁定重新计算(获取查询的本地副本,并写入结果)。
注意:另外,在将工作推入队列时,由于我们不关心较旧的结果,我们可以清除工作队列。
示例实现(使用 CTPL 线程池)
#include "CTPL\ctpl_stl.h"
#include <vector>
#include <mutex>
struct do_lots_of_work_with_threadpool
{
do_lots_of_work_with_threadpool(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
, _pool(1)
, _result_mtx()
, _query_mtx()
{
}
void update_query(size_t x) {
if (x < _data.size()) {
std::lock_guard<std::mutex> lck(_query_mtx);
_current_query = x;
}
_pool.clear_queue(); //clear as we don't want to calculate any out-date results.
_pool.push([this](int id) { recalculate_result(); });
}
int get_result() const {
std::lock_guard<std::mutex> lck(_result_mtx);
return _last_calculated_result;
}
private:
void recalculate_result() {
//dummy some work here
size_t query;
{
std::lock_guard<std::mutex> lck(_query_mtx);
query = _current_query;
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
int result = std::accumulate(_data.cbegin(), _data.cbegin() + query, 0);
{
std::lock_guard<std::mutex> lck(_result_mtx);
_last_calculated_result = result;
}
}
std::vector<int> const _data;
size_t _current_query;
int _last_calculated_result;
ctpl::thread_pool _pool;
mutable std::mutex _result_mtx;
mutable std::mutex _query_mtx;
};
使用 ThreadPool 和 Atomic 编辑(解决方案 2):
该解决方案将共享变量更改为原子,因此我们不需要任何互斥锁,也不必考虑获取/释放锁等。这更简单并且非常接近原始类(当然假设线程池类型存在于某处它不是标准的一部分)。
#include "CTPL\ctpl_stl.h"
#include <vector>
#include <mutex>
#include <atomic>
struct do_lots_of_work_with_threadpool_and_atomics
{
do_lots_of_work_with_threadpool_and_atomics(std::vector<int> data)
: _data(std::move(data))
, _current_query(0)
, _last_calculated_result(0)
, _pool(1)
{
}
void update_query(size_t x) {
if (x < _data.size()) {
_current_query.store(x);
}
_pool.clear_queue(); //clear as we don't want to calculate any out-date results.
_pool.push([this](int id) { recalculate_result(); });
}
int get_result() const {
return _last_calculated_result.load();
}
private:
void recalculate_result() {
//dummy some work here
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
_last_calculated_result.store(std::accumulate(_data.cbegin(), _data.cbegin() + _current_query.load(), 0));
}
std::vector<int> const _data;
std::atomic<size_t> _current_query;
std::atomic<int> _last_calculated_result;
ctpl::thread_pool _pool;
};
解决方案
推荐阅读
- flutter - Flutter - 如何更改下拉项目列表的大小和对齐方式
- postgresql - 错误请使用 AWS Lambda 无服务器框架手动安装 pg 包
- sql - SQLCMD:使用powershell将带有数字的参数数据库startig传递给脚本
- reactjs - 在 iframe 中启用可编辑的 Google Doc
- mysql - Sequel Pro 连接到 Mysql 8.0 服务器没有列出数据库
- oracle - 如何将转储直接下载并导入到您的 oracle Docker 映像中?
- java - 存在检查构造函数有效输入的标准方法
- javascript - 使用 mongodb 中的 async/await 预保存保存地理编码器坐标
- python - 如何将字符串中的字符与新的 f 格式对齐?
- python - how to insert row matrix to another matrix in python with decimal?