首页 > 解决方案 > 具有组独占控制并允许组内并发的 C++14 多线程

问题描述

您好,我在一个多线程场景中遇到问题:

假设我有 3 个线程在一种函数(命名为 node_a)上运行,3 个线程在另一种函数(命名为 node_b)上运行。每个线程处理不同的输入数据。他们都在某个时间访问共享资源。

现在我要控制这些线程的访问:3个node_a线程被认为是grp_a,3个node_b线程被认为是grp_b。grp_a 和 grp_b 在访问共享资源时是互斥的,但 grp_a 和 grp_b 本身可以同时运行。

这是我的方法:

但事实证明,我没有通过打印每个访问共享资源的线程的时间戳来获得独占性。

下面是我的 C++14 代码:


    std::mutex mtx;
    std::shared_timed_mutex mtx_a,mtx_b;
    atomic_int cnt_a{0},cnt_b{0};
    
    //code for node_a 
      mtx_a.lock_shared();//will block if lane holds the lock.+1
      bool a=0;
      while(cnt_a==0)  { //check if node_b threads are not running
        if(mtx_b.try_lock()) {
          std::unique_lock<std::mutex> lck(mtx);
          cnt_a++;//multiple threads change it, need protection!
          a=1;
        }
      }
    
    /*
    do some work here!!!!
    */
    
      mtx_a.unlock_shared();//-1
      {
        std::unique_lock<std::mutex> lck(mtx);
        --cnt_a;
      }
      if(a){
        mtx_a.lock(); //block until all node_a threads are done
        mtx_b.unlock(); //can only be unlocked by current thread
        cnt_a=0;
        mtx_a.unlock();
      }

node_b 的类似代码

有谁知道是什么问题?谢谢

标签: c++multithreadingc++14mutex

解决方案


您的实施存在一些问题。

  • 使用定时互斥是可疑的
  • 每当您有一个线程应该唤醒以响应某个事件时,应该涉及condition_variables、 s 或其他一些阻塞同步。future现在显然不是使用自旋锁的时候。
  • 必须用互斥锁保护原子变量并不是一个好兆头。

我建议您将程序视为状态机。它可以处于以下三种状态之一:

  1. 不工作(空闲)
  2. 执行 a 组的工作。
  3. 执行 b 组的工作。

每当一个线程想要获取资源时,程序当前处于的这些状态中的哪一个决定了该线程是被允许继续进行还是等待状态回到空闲状态。

因此,就同步资源而言,您应该需要:

  • 用于跟踪状态机当前状态的枚举
  • 了解当前有int多少线程将机器“保持”在当前状态
  • Amutex同步访问它们。
  • Acondition_variable等到我们回去闲着。

这应该就是所需要的。而已。

就详细实现而言,它可能大致如下所示:(使用 RAII 以保证干净的簿记)

// Which state the program is in.
enum class MyState {
  idle,
  a,
  b
};
MyState current_state = MyState::idle;

// How many threads of either a or b are holding the resource.
int current_state_count = 0;

// Protects access to current state and count
std::mutex state_mtx;

// Will be notified whenever the state goes back to idle.
std::condition_variable state_cv;


struct my_res_lock {
  my_res_lock(MyState s) {
    assert(s != MyState::idle);

    // Wait until the current state is either idle, or the one we want
    std::unique_lock<std::mutex> lk(state_mtx);
    state_cv.wait(lk, [s]{return current_state == MyState::idle || current_state == s;});

    // If we were in idle, transition to the desired state.
    current_state = s; 
  
    current_state_count += 1;
  }

  ~my_res_lock() {
    std::lock_guard lk(state_mtx);
    current_state_count -= 1;
    if( current_state_count == 0 ) {
      current_state = MyState::idle;

      // whenever we go back to idle, wake up all waiting threads.
      state_cv.notify_all();
    }
  }
};

// in thread A:
{
  my_res_lock res_lock(MyState::a);

  // do some work!
}

// in thread B:
{
  my_res_lock res_lock(MyState::b);

  // do some work!
}

推荐阅读