首页 > 解决方案 > 为什么线程池工作缓慢?

问题描述

我有程序使用 N 个线程计算给定目录中所有 .log 文件中的所有单词。
我写了这样的东西。
线程池.h

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include <boost/thread/condition_variable.hpp>
#include <boost/thread.hpp>

#include <future> // I don't how to work with boost future
#include <queue>
#include <vector>
#include <functional>


class ThreadPool
{
public:
    using Task = std::function<void()>; // Our task

    explicit ThreadPool(int num_threads)
    {
        start(num_threads);
    }

    ~ThreadPool()
    {
        stop();
    }

    template<class T>
    auto enqueue(T task)->std::future<decltype(task())>
    {
        // packaged_task wraps any Callable target
        auto wrapper = std::make_shared<std::packaged_task<decltype(task()) ()>>(std::move(task));

        {
            boost::unique_lock<boost::mutex> lock{ mutex_p };
            tasks_p.emplace([=] {
                (*wrapper)();
            });
        }

        event_p.notify_one();

        return wrapper->get_future();
    }

    /*void enqueue(Task task)
    {
        {
            boost::unique_lock<boost::mutex> lock { mutex_p };
            tasks_p.emplace(std::move(task));
            event_p.notify_one();
        }
    }*/

private:
    std::vector<boost::thread> threads_p; // num of threads
    std::queue<Task>           tasks_p;   // Tasks to make
    boost::condition_variable  event_p; 
    boost::mutex               mutex_p;

    bool                       isStop = false;

    void start(int num_threads)
    {
        for (int i = 0; i < num_threads; ++i)
        {
            // Add to the end our thread
            threads_p.emplace_back([=] {
                while (true)
                {
                    // Task to do
                    Task task;

                    {
                        boost::unique_lock<boost::mutex> lock(mutex_p);

                        event_p.wait(lock, [=] { return isStop || !tasks_p.empty(); });

                        // If we make all tasks
                        if (isStop && tasks_p.empty())
                            break;

                        // Take new task from queue
                        task = std::move(tasks_p.front());
                        tasks_p.pop();
                    }

                    // Execute our task
                    task();
                }
            });
        }
    }

    void stop() noexcept
    {
        {
            boost::unique_lock<boost::mutex> lock(mutex_p);
            isStop = true;
        }

        event_p.notify_all();

        for (auto& thread : threads_p)
        {
            thread.join();
        }
    }
};

#endif

主文件

#include "ThreadPool.h"

#include <iostream>
#include <iomanip>
#include <Windows.h>
#include <chrono> 

#include <vector>
#include <map>

#include <boost/filesystem.hpp>
#include <boost/thread.hpp>

#include <locale.h>


namespace bfs = boost::filesystem;

//int count_words(boost::filesystem::ifstream& file)
//{
//  int counter = 0;
//  std::string buffer;
//  while (file >> buffer)
//  {
//      ++counter;
//  }
//  
//  return counter;
//}
//
int count_words(boost::filesystem::path filename)
{
    boost::filesystem::ifstream ifs(filename);
    return std::distance(std::istream_iterator<std::string>(ifs), std::istream_iterator<std::string>());
}

int main(int argc, const char* argv[])
{
    std::cin.tie(0);
    std::ios_base::sync_with_stdio(false);

    bfs::path path = argv[1];
    // If this path is exist and if this is dir
    if (bfs::exists(path) && bfs::is_directory(path))
    {
        // Number of threads. Default = 4
        int n = (argc == 3 ? atoi(argv[2]) : 4);
        ThreadPool pool(n);

        // Container to store all filenames and number of words inside them
        //std::map<bfs::path, std::future<int>> all_files_and_sums;
        std::vector<std::future<int>> futures;
        
        auto start = std::chrono::high_resolution_clock::now();

        // Iterate all files in dir
        for (auto& p : bfs::directory_iterator(path)) {
            // Takes only .txt files
            if (p.path().extension() == ".log") {
                // Future for taking value from here
                auto fut = pool.enqueue([p]() {
                    // In this lambda function I count all words in file and return this value
                    int result = count_words(p.path());
                    static int count = 0;
                    ++count;
                    std::ostringstream oss;
                    oss << count << ". TID, " << GetCurrentThreadId() << "\n";
                    std::cout << oss.str();
                    return result;
                });
                // "filename = words in this .txt file"
                futures.emplace_back(std::move(fut));
            }
        }

        int result = 0;

        for (auto& f : futures)
        {
            result += f.get();
        }

        auto stop = std::chrono::high_resolution_clock::now();

        auto duration = std::chrono::duration_cast<std::chrono::seconds>(stop - start);

        std::cout << "Result: " << result << "\n";
 
        std::cout << duration.count() << '\n';
    }
    else
        std::perror("Dir is not exist");
}

变量 N 为 4(线程数)。我的目录中有 320 个 .log 文件,我需要在这些文件中计算字数。一切正常,但当变量“计数”为 180 时 - 程序停止一段时间然后继续但慢得多。
可能是什么原因?CPU - Xeon e5430(我已经在另一个 CPU 上测试过这个程序 - 结果是一样的)。

标签: c++multithreading

解决方案


这取决于您如何衡量“慢”,但基本上您使用的可能是最糟糕的模型之一:

  1. 一个任务队列在所有线程之间共享。

这种方法的问题是阻塞共享队列上的每个线程。

一个更好的模型是这样的

  1. 任务窃取- 您可以尝试创建一个任务队列 pro 线程,然后使用try_lock(它不会阻塞)启用每个线程从其他线程的任务“窃取”工作,如果它没有其他事情可做。

这在Sean Parent Talk about Concurrency中得到了很好的解释。


推荐阅读