首页 > 解决方案 > C11 跨多个类和线程共享线程安全 Q

问题描述

我对 C++ 很陌生,对使用互斥锁也很陌生。我正在尝试通过@ChewOnThis_Trident 从这个答案中实现一个线程安全队列。

本质上,我有不同的线程将消息添加到队列中,我需要保留它们被添加的顺序。但是,这些消息需要在添加之前进行一些有条件的修改。在单独线程上的真实代码监听器调用独特的“handleMessage”函数,在将消息添加到队列之前修改消息。一个单独的线程检查消息是否在队列中并按顺序处理它们。在完整的代码中,我知道侦听器正在以正确的顺序接收消息,但他们未能以正确的顺序将它们添加到队列中。

我认为问题是在收到消息和修改消息之间有一段时间,导致消息乱序。

出于实际代码中的实际原因,我无法在“Safequeue::enqueue”中进行这些修改。

在我的示例中,两个线程可以添加到队列中。一个线程从中读取。在这种情况下,“消息”是一个随机整数。“UsesQ”处理添加到队列和消息修改(例如,使所有整数都为奇数)。

我认为调用“UsesQ::addQ”时需要另一个互斥锁,但它需要在所有线程之间共享,我不确定我是否不确定如何实现它。

在示例中,我正在努力思考一种测试顺序是否正确的方法。

这是示例:

#include <queue>
#include <mutex>
#include <condition_variable>
#include <stdio.h>      
#include <stdlib.h>  
#include <iostream>   
#include <assert.h>
#include <pthread.h>
#include <unistd.h>


class SafeQueue
{// A threadsafe-queue.
public:
  SafeQueue(void)
    : q()
    , m()
    , cv()
  {}

  ~SafeQueue(void)
  {}


  // Add an element to the queue.
  void enqueue(int i)
  {  
    std::lock_guard<std::mutex> lock(m);
    q.push(i);
    cv.notify_one();
  }

  // Get the "front"-element.
  // If the queue is empty, wait till a element is avaiable.
  int dequeue(void)
  {
    std::unique_lock<std::mutex> lock(m);
    while(q.empty())
    {
      // release lock as long as the wait and reaquire it afterwards.
      cv.wait(lock);
    }
    int val = q.front();
    q.pop();
    return val;
  }

private:
  std::queue<int> q;
  mutable std::mutex m;
  std::condition_variable cv;
 
};


class UsesQ
{
    private:
    int readVal;
    int lastReadVal = 1;

    public:
    SafeQueue & Q;
    UsesQ(SafeQueue & Q): Q(Q){};
    ~UsesQ(){};
    void addQ(int i)
    {
      if(i% 2 == 0)
      {
        i++;//some conditional modification to the initial "message"
      }
      Q.enqueue(i);
    }
    void removeQ()
    {
      readVal = Q.dequeue();
    }
};

void* run_add(void* Ptr)
{
    UsesQ * UsesQPtr = (UsesQ *)Ptr;
    for(;;)
    {
    int i = rand();//simulate an incoming "message"
        UsesQPtr->addQ(i);
    }
    pthread_exit (NULL);
    return NULL;
}

void* run_remove(void* Ptr)
{
    UsesQ * UsesQPtr = (UsesQ *)Ptr;
    for(;;)
    {
        UsesQPtr->removeQ();
    }
    pthread_exit (NULL);
    return NULL;
}

int main()
{

  SafeQueue Q;

  UsesQ * UsesQPtr = new UsesQ(std::ref(Q));

    pthread_t thread1;
    pthread_create(&thread1, NULL, run_add, UsesQPtr); 

  pthread_t thread2;
    pthread_create(&thread2, NULL, run_add, UsesQPtr); 

  pthread_t thread3;
    pthread_create(&thread3, NULL, run_remove, UsesQPtr); 

  while(1)
  {
    usleep(1);
    printf(".\n");
  }

};

符合 pthread 标签

g++ main.cpp -pthread

感谢您的任何帮助。

标签: c++multithreadingmutexmessage-queuerace-condition

解决方案


推荐阅读