c++ - 无法通过 std::vector 同步 std::thread
问题描述
代码
我有一个使用线程运行的算法。当算法的几个部分进入特定步骤时,我希望主线程知道这一点。我在 Xubuntu 18.04 下工作。
主应用程序代码遵循此方案
vector<thread> threads;
vector<bool> flags;
bool finish = false;
cout << "Create" << endl;
create_threads(threads, flags, finish);
cout << "Finish" << endl;
finish = true;
cout << "Destroy" << endl;
destroy_threads(threads);
cout << "End" << endl;
return 0;
每个线程循环直到finish
is true
(它由外部的主线程更改)。此外,在创建线程后,我通过分配检查它们是否已到达特定步骤flags[thread_number] = true
void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
auto command = [&finish, &flags](unsigned thread_number)
{
while (!finish)
{
flags[thread_number] = true;
}
};
for (unsigned i = 0; i < threads_count; ++i)
{
flags.push_back(false);
threads.push_back(move(thread{command, i}));
}
// BUG: Freeze is here
wait_threads(flags);
// BUG: Freeze is here
}
我有一个检查功能。它只是等到flags
向量的所有元素都是true
. 在线程执行之前,它们都是false
,并且只有线程修改它们。
void wait_threads(vector<bool>& flags)
{
while (true)
{
if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
{
return;
}
}
}
整个代码示例如下所示,可以使用clang++ sync.cpp -lpthread -o sync
#include <algorithm>
#include <thread>
#include <vector>
#include <iostream>
#include <utility>
using namespace std;
const unsigned threads_count = 2;
/**
* Wait for threads to start.
* Every thread has an access to the `flags` vector.
* After the initialization each thread assigns `true`
* to each component of the vector.
*
* Though, with `-O2` and `-O3` optimization flags
* the function `wait_threads` freezes,
* like the elements don't change from `false` to `true`.
*/
void wait_threads(vector<bool>& flags)
{
while (true)
{
if (all_of(flags.begin(), flags.end(), [](bool value) { return value; }))
{
return;
}
}
}
/**
* Create threads.
* After the launch, each thread assigns `true` to the corresponding `flag` cell.
* Also, the threads watch `finish` variable to stop when it's `true`.
*/
void create_threads(vector<thread>& threads, vector<bool>& flags, bool& finish)
{
auto command = [&finish, &flags](unsigned thread_number)
{
while (!finish)
{
flags[thread_number] = true;
}
};
for (unsigned i = 0; i < threads_count; ++i)
{
flags.push_back(false);
threads.push_back(move(thread{command, i}));
}
// BUG: Freeze is here
wait_threads(flags);
// BUG: Freeze is here
}
/**
* Wait until all threads finish.
*/
void destroy_threads(vector<thread>& threads)
{
for (auto& el : threads)
{
el.join();
}
}
int main()
{
vector<thread> threads;
vector<bool> flags;
bool finish = false;
cout << "Create" << endl;
create_threads(threads, flags, finish);
cout << "Finish" << endl;
finish = true;
cout << "Destroy" << endl;
destroy_threads(threads);
cout << "End" << endl;
return 0;
}
问题
当我在没有任何优化的情况下编译代码时,它运行良好。如果我使用-O2
,-O3
或,它会在通话中-Ofast
冻结。wait_threads
看起来编译器看到在主线程flags
中填充了向量,并记住了它。false
它甚至发生在const unsigned threads_count = 1;
. 我试过使用vector< atomic<unsigned> >
,但我发现了如何在 C++ 中声明原子向量的问题,并且明白这很棘手,不是解决问题的正确方法。
附录(真实代码)
签入真实代码更复杂。这是真正的等待功能
void wait_threads_change(
ULONG beginning,
vector<ULONG>* last_changes,
vector<ULONG>* last_checks
)
{
while (*std::min_element(last_checks->begin(), last_checks->end()) <= beginning)
{
// cout << flush; // WORKAROUND: This helps to avoid freeze
}
while (
(*std::min_element(last_checks->begin(), last_checks->end())
<= *std::max_element(last_changes->begin(), last_changes->end()))
)
{
}
}
和真正的线程创建
void run_csp_threads(
struct ConstraintGraph *graph,
vector<thread>* threads,
vector<ULONG>* last_changes,
vector<ULONG>* last_checks,
BOOL& finish
)
{
const ULONG THREADS = 7;
auto command = ([graph, THREADS, last_changes, last_checks, &finish](ULONG thread_number)
{
while (!finish)
{
if (csp_solution_iteration(graph, THREADS, thread_number))
{
do
{
(*last_changes)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
} while (csp_solution_iteration(graph, THREADS, thread_number));
}
else
{
(*last_checks)[thread_number] = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
}
}
});
ULONG beginning = std::chrono::duration_cast<std::chrono::nanoseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
for (ULONG thread_number = 0; thread_number < THREADS; ++thread_number)
{
last_changes->push_back(beginning);
last_checks->push_back(beginning);
threads->push_back(move(thread{command, thread_number}));
}
wait_threads_change(beginning, last_changes, last_checks);
}
对于这个问题,我试图尽可能地简化代码,以了解如何解决至少一个问题。
解决方案
推荐阅读
- routes - 是否可以使用 HERE Routing 将 Waypoint-ID 添加到 via Waypoints?
- amazon-web-services - 如何使用 Terraform 在 EMR 上安装 Spark、Hadoop?
- unit-testing - 从声纳覆盖范围中删除招摇定义
- r - 比较两个 DF 并找出不同数据类型的指定列的差异
- c++ - C++ Counting Operations
- reactjs - After adding useDispatch, the code behind cannot be run
- google-analytics - Google Analytics - 处理过滤器中的主机名更改
- python - 如何从CSV文件中的第二列和第三列获取最大值,Python中没有行标题
- python - Request post method doesn't return valid response
- c - Clang scan-build fails to find infinite loop