c++ - 为什么线程池工作缓慢?
问题描述
我有程序使用 N 个线程计算给定目录中所有 .log 文件中的所有单词。
我写了这样的东西。
线程池.h
#ifndef THREAD_POOL_H
#define THREAD_POOL_H
#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>
#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>
class ThreadPool
{
public:
using Task = std::function<void()>; // Our task
explicit ThreadPool(int num_threads)
{
start(num_threads);
}
~ThreadPool()
{
stop();
}
template<class T>
auto enqueue(T task)->std::future<decltype(task())>
{
// packaged_task wraps any Callable target
auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));
{
boost::unique_lock<boost::mutex> lock{ mutex_p };
tasks_p.emplace([=] {
(*wrapper)();
});
}
event_p.notify_one();
return wrapper->get_future();
}
/*void enqueue(Task task)
{
{
boost::unique_lock<boost::mutex> lock { mutex_p };
tasks_p.emplace(std::move(task));
event_p.notify_one();
}
}*/
private:
std::vector<boost::thread> threads_p; // num of threads
std::queue<Task> tasks_p; // Tasks to make
boost::condition_variable event_p;
boost::mutex mutex_p;
bool isStop = false;
void start(int num_threads)
{
for (int i = 0; i < num_threads; ++i)
{
// Add to the end our thread
threads_p.emplace_back([=] {
while (true)
{
// Task to do
Task task;
{
boost::unique_lock<boost::mutex> lock(mutex_p);
event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });
// If we make all tasks
if (isStop && tasks_p.empty())
break;
// Take new task from queue
task = std::move(tasks_p.front());
tasks_p.pop();
}
// Execute our task
task();
}
});
}
}
void stop() noexcept
{
{
boost::unique_lock<boost::mutex> lock(mutex_p);
isStop = true;
}
event_p.notify_all();
for (auto& thread : threads_p)
{
thread.join();
}
}
};
#endif
主文件
#include "ThreadPool.h"
#include <iostream>
#include <iomanip>
#include <Windows.h>
#include <chrono>
#include <vector>
#include <map>
#include <boost/filesystem.hpp>
#include <boost/thread.hpp>
#include <locale.h>
namespace bfs = boost::filesystem;
//int count_words(boost::filesystem::ifstream& file)
//{
// int counter = 0;
// std::string buffer;
// while (file >> buffer)
// {
// ++counter;
// }
//
// return counter;
//}
//
int count_words(boost::filesystem::path filename)
{
boost::filesystem::ifstream ifs(filename);
return std::distance(std::istream_iterator<std::string>(ifs), std::istream_iterator<std::string>());
}
int main(int argc, const char* argv[])
{
std::cin.tie(0);
std::ios_base::sync_with_stdio(false);
bfs::path path = argv[1];
// If this path is exist and if this is dir
if (bfs::exists(path) && bfs::is_directory(path))
{
// Number of threads. Default = 4
int n = (argc == 3 ? atoi(argv[2]) : 4);
ThreadPool pool(n);
// Container to store all filenames and number of words inside them
//std::map<bfs::path, std::future<int>> all_files_and_sums;
std::vector<std::future<int>> futures;
auto start = std::chrono::high_resolution_clock::now();
// Iterate all files in dir
for (auto& p : bfs::directory_iterator(path)) {
// Takes only .txt files
if (p.path().extension() == ".log") {
// Future for taking value from here
auto fut = pool.enqueue([p]() {
// In this lambda function I count all words in file and return this value
int result = count_words(p.path());
static int count = 0;
++count;
std::ostringstream oss;
oss << count << ". TID, " << GetCurrentThreadId() << "\n";
std::cout << oss.str();
return result;
});
// "filename = words in this .txt file"
futures.emplace_back(std::move(fut));
}
}
int result = 0;
for (auto& f : futures)
{
result += f.get();
}
auto stop = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(stop - start);
std::cout << "Result: " << result << "\n";
std::cout << duration.count() << '\n';
}
else
std::perror("Dir is not exist");
}
变量 N 为 4(线程数)。我的目录中有 320 个 .log 文件,我需要在这些文件中计算字数。一切正常,但当变量“计数”为 180 时 - 程序停止一段时间然后继续但慢得多。
可能是什么原因?CPU - Xeon e5430(我已经在另一个 CPU 上测试过这个程序 - 结果是一样的)。
解决方案
这取决于您如何衡量“慢”,但基本上您使用的可能是最糟糕的模型之一:
- 一个任务队列在所有线程之间共享。
这种方法的问题是阻塞共享队列上的每个线程。
一个更好的模型是这样的
- 任务窃取- 您可以尝试创建一个任务队列 pro 线程,然后使用try_lock(它不会阻塞)启用每个线程从其他线程的任务“窃取”工作,如果它没有其他事情可做。
这在Sean Parent Talk about Concurrency中得到了很好的解释。
推荐阅读
- tfs - Dotnet 框架 - 在 TFS cd 管道中使用变量替换更改 Nlog 配置
- javascript - Mongoose:查找所有给出转换错误的文档
- javascript - Firebase Cloud Functions:为什么对外部 URL 的某些请求会失败?
- ansible - Ansible - 检查列表中的元素是否在另一个字典中没有相同的值
- python - 如何检查元素是否存在,如果存在则关闭 selenium 驱动程序,如果它不继续使用 Python 中的脚本
- laravel - Laravel 8日期时区没有改变
- python - ___ 后面的 model.summary 中的 None 是什么意思?
- python - ipyvuetify 按钮并排显示
- flutter - 如何将新项目添加到选择文本菜单?
- javascript - 在 Javascript 中运行新的“转换为图层”命令