首页 > 解决方案 > 无法通过 std::vector 同步 std::thread

问题描述

代码

我有一个使用线程运行的算法。当算法的几个部分进入特定步骤时,我希望主线程知道这一点。我在 Xubuntu 18.04 下工作。

主应用程序代码遵循此方案

vector<thread> threads;
vector<bool> flags;
bool finish = false;
cout << "Create" << endl;
create_threads(threads, flags, finish);
cout << "Finish" << endl;
finish = true;
cout << "Destroy" << endl;
destroy_threads(threads);
cout << "End" << endl;
return 0;

每个线程循环直到finishis true (它由外部的主线程更改)。此外,在创建线程后,我通过分配检查它们是否已到达特定步骤flags[thread_number] = true

void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
    auto command = [&finish, &flags](unsigned thread_number)
    {
        while (!finish)
        {
            flags[thread_number] = true;
        }
    };
    for (unsigned i = 0; i < threads_count; ++i)
    {
        flags.push_back(false);
        threads.push_back(move(thread{command, i}));
    }
    // BUG: Freeze is here
    wait_threads(flags);
    // BUG: Freeze is here
}

我有一个检查功能。它只是等到flags向量的所有元素都是true. 在线程执行之前,它们都是false,并且只有线程修改它们。

void wait_threads(vector<bool>& flags)
{
    while (true)
    {
        if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
        {
            return;
        }
    }
}

整个代码示例如下所示,可以使用clang++ sync.cpp -lpthread -o sync

#include <algorithm>
#include <thread>
#include <vector>
#include <iostream>
#include <utility>

using namespace std;

const unsigned threads_count = 2;

/**
 * Wait for threads to start.
 * Every thread has an access to the `flags` vector.
 * After the initialization each thread assigns `true`
 * to each component of the vector.
 *
 * Though, with `-O2` and `-O3` optimization flags
 * the function `wait_threads` freezes,
 * like the elements don't change from `false` to `true`.
 */
void wait_threads(vector<bool>& flags)
{
    while (true)
    {
        if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
        {
            return;
        }
    }
}

/**
 * Create threads.
 * After the launch, each thread assigns `true` to the corresponding `flag` cell.
 * Also, the threads watch `finish` variable to stop when it's `true`.
 */
void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
    auto command = [&finish, &flags](unsigned thread_number)
    {
        while (!finish)
        {
            flags[thread_number] = true;
        }
    };
    for (unsigned i = 0; i < threads_count; ++i)
    {
        flags.push_back(false);
        threads.push_back(move(thread{command, i}));
    }
    // BUG: Freeze is here
    wait_threads(flags);
    // BUG: Freeze is here
}

/**
 * Wait until all threads finish.
 */
void destroy_threads(vector<thread>& threads)
{
    for (auto& el : threads)
    {
        el.join();
    }
}

int main()
{
    vector<thread> threads;
    vector<bool> flags;
    bool finish = false;
    cout << "Create" << endl;
    create_threads(threads, flags, finish);
    cout << "Finish" << endl;
    finish = true;
    cout << "Destroy" << endl;
    destroy_threads(threads);
    cout << "End" << endl;
    return 0;
}

问题

当我在没有任何优化的情况下编译代码时,它运行良好。如果我使用-O2,-O3或,它会在通话中-Ofast冻结。wait_threads看起来编译器看到在主线程flags中填充了向量,并记住了它。false它甚至发生在const unsigned threads_count = 1;. 我试过使用vector< atomic<unsigned> >,但我发现了如何在 C++ 中声明原子向量的问题,并且明白这很棘手,不是解决问题的正确方法。

附录(真实代码)

签入真实代码更复杂。这是真正的等待功能

void wait_threads_change(
    ULONG beginning,
    vector<ULONG>* last_changes,
    vector<ULONG>* last_checks
)
{
    while (*std::min_element(last_checks->begin(), last_checks->end()) <= beginning)
    {
        // cout << flush; // WORKAROUND: This helps to avoid freeze
    }
    while (
        (*std::min_element(last_checks->begin(), last_checks->end())
        <= *std::max_element(last_changes->begin(), last_changes->end()))
    )
    {
    }
}

和真正的线程创建

void run_csp_threads(
    struct ConstraintGraph *graph,
    vector<thread>* threads,
    vector<ULONG>* last_changes,
    vector<ULONG>* last_checks,
    BOOL& finish
)
{
    const ULONG THREADS = 7;
    auto command = ([graph, THREADS, last_changes, last_checks, &finish](ULONG thread_number)
    {
        while (!finish)
        {
            if (csp_solution_iteration(graph, THREADS, thread_number))
            {
                do
                {
                    (*last_changes)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
                } while (csp_solution_iteration(graph, THREADS, thread_number));
            }
            else
            {
                (*last_checks)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
            }
        }
    });
    ULONG beginning = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
    for (ULONG thread_number = 0; thread_number < THREADS; ++thread_number)
    {
        last_changes->push_back(beginning);
        last_checks->push_back(beginning);
        threads->push_back(move(thread{command, thread_number}));
    }
    wait_threads_change(beginning, last_changes, last_checks);
}

对于这个问题,我试图尽可能地简化代码,以了解如何解决至少一个问题。

标签: c++multithreadingstdcompiler-optimizationthread-synchronization

解决方案


推荐阅读