c++ - 如何等到池中的所有线程结束工作?
问题描述
我正在尝试使用 boost 库实现简单的线程池。
这是代码:
//boost::asio::io_service ioService;
//boost::thread_group pool;
//boost::asio::io_service::work* worker;
ThreadPool::ThreadPool(int poolSize /*= boost::thread::hardware_concurrency()*/)
{
if (poolSize >= 1 && poolSize <= boost::thread::hardware_concurrency())
threadAmount = poolSize;
else
threadAmount = 1;
worker = NULL;
}
ThreadPool::~ThreadPool()
{
if (worker != NULL && !ioService.stopped())
{
_shutdown();
delete worker;
worker = NULL;
}
}
void ThreadPool::start()
{
if (worker != NULL)
{
return;
}
worker = new boost::asio::io_service::work(ioService);
for (int i = 0; i < threadAmount; ++i)
{
pool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
}
}
template<class F, class...Args>
void ThreadPool::execute(F f, Args&&... args)
{
ioService.post(boost::bind(f, std::forward<Args>(args)...));
}
void ThreadPool::shutdown()
{
pool.interrupt_all();
_shutdown();
}
void ThreadPool::join_all()
{
// wait for all threads before continue
// in other words - barier for all threads when they finished all jobs
// and to be able re-use them in futur.
}
void ThreadPool::_shutdown()
{
ioService.reset();
ioService.stop();
}
在我的程序中,我将一些需要完成的任务分配给线程池,并进一步使用主线程。在某些时候,我需要等待所有线程完成所有任务,然后才能继续计算。有没有办法做到这一点?
非常感谢。
解决方案
正如其他人指出的那样,罪魁祸首是work
实例。
我会大大简化接口(实际上没有理由在析构函数中拆分shutdown
为shutdown
,_shutdown
和join_all
一些随机逻辑。这只会让人很难知道责任在哪里。
界面应该是一个成功的坑- 易于使用正确,难以使用错误。
同时,它使正确实现它变得更加容易。
这是第一个刺:
#include <boost/asio.hpp>
#include <boost/thread.hpp>
namespace ba = boost::asio;
struct ThreadPool {
ThreadPool(unsigned poolSize = boost::thread::hardware_concurrency());
~ThreadPool();
void start();
template <typename F, typename... Args>
void execute(F f, Args&&... args) {
ioService.post(std::bind(f, std::forward<Args>(args)...));
}
private:
unsigned threadAmount;
ba::io_service ioService;
boost::thread_group pool;
std::unique_ptr<ba::io_service::work> work;
void shutdown();
};
ThreadPool::ThreadPool(
unsigned poolSize /*= boost::thread::hardware_concurrency()*/) {
threadAmount = std::max(1u, poolSize);
threadAmount = std::min(boost::thread::hardware_concurrency(), poolSize);
}
ThreadPool::~ThreadPool() {
shutdown();
}
void ThreadPool::start() {
if (!work) {
work = std::make_unique<ba::io_service::work>(ioService);
for (unsigned i = 0; i < threadAmount; ++i) {
pool.create_thread(
boost::bind(&ba::io_service::run, &ioService));
}
}
}
void ThreadPool::shutdown() {
work.reset();
pool.interrupt_all();
ioService.stop();
pool.join_all();
ioService.reset();
}
#include <iostream>
using namespace std::chrono_literals;
int main() {
auto now = std::chrono::high_resolution_clock::now;
auto s = now();
{
ThreadPool p(10);
p.start();
p.execute([] { std::this_thread::sleep_for(1s); });
p.execute([] { std::this_thread::sleep_for(600ms); });
p.execute([] { std::this_thread::sleep_for(400ms); });
p.execute([] { std::this_thread::sleep_for(200ms); });
p.execute([] { std::this_thread::sleep_for(10ms); });
}
std::cout << "Total elapsed: " << (now() - s) / 1.0s << "s\n";
}
在大多数多核系统上,哪个会打印出类似我的东西:
Total elapsed: 1.00064s
如果超过 hardware_concurrency
threadAmount
,您在计算要采取的位置时似乎有错误。1
poolSize
老实说,为什么要bind
在执行中?它真的不会增加很多,你可以把它留给调用者,他们可以选择是否使用绑定,如果是,是boost::bind
,std::bind
还是其他一些组合可调用对象的方式:
template <typename F>
void execute(F f) { ioService.post(f); }
您缺少围绕io_service::run
调用的异常处理(请参阅是否应该捕获 boost::asio::io_service::run() 引发的异常?)。
如果您使用的是最新的 boost 版本,则可以使用较新的io_context
和thread_pool
接口,大大简化了事情:
#include <boost/asio.hpp>
struct ThreadPool {
ThreadPool(unsigned poolSize)
: pool(std::clamp(poolSize, 1u, std::thread::hardware_concurrency()))
{ }
template <typename F>
void execute(F f) { post(pool, f); }
private:
boost::asio::thread_pool pool;
};
这仍然具有 99% 的功能¹,但在 10 LoC 中。
事实上,这个类已经变成了一个简单的包装器,所以我们可以这样写:
#include <boost/asio.hpp>
#include <iostream>
using namespace std::chrono_literals;
using C = std::chrono::high_resolution_clock;
static void sleep_for(C::duration d) { std::this_thread::sleep_for(d); }
int main() {
auto s = C::now();
{
boost::asio::thread_pool pool;
post(pool, [] { sleep_for(1s); });
post(pool, [] { sleep_for(600ms); });
// still can bind if you want
post(pool, std::bind(sleep_for, 400ms));
post(pool, std::bind(sleep_for, 200ms));
post(pool, std::bind(sleep_for, 10ms));
//pool.join(); // implicit in destructor
}
std::cout << "Total elapsed: " << (C::now() - s) / 1.0s << "s\n";
}
主要区别在于默认池大小:它是 2*硬件并发(但也更安全地计算,因为并非所有平台都有可靠的 hardware_concurrency() - 它可能是零,例如)。
¹它目前不行使中断点
推荐阅读
- npm - 如何使用 yarn 管理客户端依赖项?
- python - TensorFlow pycharm 中的 GPU 警告
- c# - 当高度绑定到 StackPanel 时,UserControl 中的文本框正在锁定应用程序
- python - 您如何通过 Flask 中的 Stripe Checkout 流程携带像 userID 这样的变量?
- java - 试图从arraylist打印信息,但我认为我正在获取内存
- javascript - JQuery滑块不与用d3垂直绘制的数据交互
- html - 如何在 CSS 中堆叠表格中仅一行的 TD 元素?
- linear-programming - 如何在 GPL 线性规划 (Gusek) 中将变量绑定为负数?
- javascript - 为什么 CSS 过渡不会在 ReactJS 中正确呈现?
- c# - 从存储库中提取时,Visual Studio 2017 丢失了 WPF 的引用