首页 > 解决方案 > 为什么在其他线程中有作业并等待它们会导致整个应用程序等待?

问题描述

我有一个线程池

struct ThreadPool
{
    using Task = std::packaged_task<void()>;

    explicit ThreadPool(size_t workersCount)
    {
        workers.reserve(workersCount);
        for(uint32_t i = 0u; i < workersCount; ++i) {
            workers.emplace_back([=]() {
                while(true) {
                    Task result;
                    {
                        std::unique_lock<std::mutex> locker(mutex);
                        condition.wait(locker, [=]() { return stop || !tasks.empty(); });

                        if(stop && tasks.empty()) {
                            break;
                        }

                        result = std::move(tasks.front());
                        tasks.pop();
                    }

                    result();
                }
            });
        }
    }

    ~ThreadPool() noexcept
    {
        stop = true;

        condition.notify_all();

        for(auto& worker : workers) {
            worker.join();
        } workers.clear();
    }

    template<typename T>
    inline auto Enqueue(T task)->std::future<decltype(task())>
    {
        auto package = std::packaged_task<decltype(task())()>(std::move(task));
        auto result = package.get_future();
        {
            std::unique_lock<std::mutex> locker(mutex);
            tasks.emplace(std::move(package));
        }

        condition.notify_one();

        return result;
    }

    std::vector<std::thread> workers;

    std::queue<Task> tasks;
    std::mutex mutex;
    std::condition_variable condition;
    std::atomic_bool stop = false;
};

这个例子

//just for the example this is a global
static ThreadPool pool{4};

struct DoSomethingStruct
{
    void DoSomething()
    {
        std::vector<std::future<void>> futures;

        for(uint32_t i = 0; i < 10; ++i) {
            futures.push_back(pool.Enqueue([this, i]() {
                ints.push_back(i);
            }));
        }

        for(const auto& future : futures) {
            future.wait();
        }
    }

    std::vector<int> ints;
};

int main()
{
    DoSomethingStruct dss;

    std::vector<std::future<void>> futures;

    for(uint32_t i = 0; i < 10; ++i) {
        futures.push_back(pool.Enqueue([&dss, i]() {
            dss.DoSomething();
        }));
    }

    for(const auto& future : futures) {
        future.wait();
    }

    return 0;
}

当我运行应用程序时,它永远不会结束。上面的示例实际上并未提供实际用例。我想知道为什么它不等待 10 个期货,DoSomethingStruct::DoSomthing();然后主要等待 10 个其他工作。

我想做一些类似于这个人所做的事情https://wickedengine.net/2018/11/24/simple-job-system-using-standard-c/,但使用期货、互斥锁和条件变量。

这是为什么?我做错了什么?

标签: c++multithreading

解决方案


首先,您的池创建 4 个工作线程。然后,在 中main,您将一些任务添加到池队列中,该队列调用dss.DoSomething();.

然后工人开始执行这些任务。在内部,他们首先将更多任务排入队列,然后,他们开始永远等待他们的未来。这些等待永远不会结束,因为没有线程可以开始解决下一个排队的任务。

创建一个能够从已处理任务中将任务排入队列的线程池并非易事。基本上,您需要在这里暂停当前任务而不是等待。在 C++ 中没有对此的本机机制(至少在 C++20 协程之前)。

作为一种解决方法,您可以使用 OpenMP 或 Intel TBB,它们都提供了所描述的功能。例如,在 OpenMP 中,您可以暂停当前任务并等待其子任务完成#pragma omp takswait


推荐阅读