首页 > 解决方案 > 我的生产者消费者代码中是否存在死锁或竞争条件?

问题描述

我正在尝试在 C++ 中实现生产者 <-> 消费者模式。当我读到这种模式时,他们似乎总是提到必须避免的潜在死锁。但是,我在下面没有使用任何互斥体就实现了这一点。我的代码有什么问题?

#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <atomic>

class CircularBuffer
{
public:
    CircularBuffer();
    int*          getWritePos();
    void      finishedWriting();
    int*           getReadPos();
    void      finishedReading();
private:
    void waitForReaderToCatchUp();
    void waitForWriterToCatchUp();

    const int size = 5;
    std::vector<int> data;
    // Changed from int since these variables are shared between the two threads and assignment is not necessarily atomic: 
    std::atomic<int> writePos = 0;
    std::atomic<int> readPos = 0;
};

CircularBuffer::CircularBuffer() {
    data.resize(size);
}

void
CircularBuffer::waitForReaderToCatchUp() {
    int unread = writePos - readPos;
    while (unread >= size) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getWritePos() {
    waitForReaderToCatchUp();
    int pos = writePos % size;
    return &data[pos];
}

void
CircularBuffer::finishedWriting() {
    writePos++;
}

void
CircularBuffer::waitForWriterToCatchUp() {
    int unread = writePos - readPos;
    while (unread < 1) {
        std::this_thread::sleep_for(std::chrono::nanoseconds(10));
        unread = writePos - readPos;
    }
}

int*
CircularBuffer::getReadPos() {
    waitForWriterToCatchUp();
    int pos = readPos % size;
    return &data[pos];
}

void
CircularBuffer::finishedReading() {
    readPos++;
}

const int produceMinTime = 100;

void produce(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        int r = rand() % 1000;
        std::this_thread::sleep_for(std::chrono::milliseconds(produceMinTime + r));
        int *p = cb->getWritePos();
        memcpy(p, &i, 4);
        cb->finishedWriting();
    }
}

void consume(CircularBuffer *cb) {
    for (int i = 0; i < 15; i++) {
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        int *p = cb->getReadPos();
        int j = *p;
        std::cout << "Value: " << j << std::endl;
        cb->finishedReading();
    }
}

int main()
{
    CircularBuffer cb;
    std::thread t1(produce, &cb);
    std::this_thread::sleep_for(std::chrono::milliseconds(2000));
    std::thread t2(consume, &cb);

    t1.join();
    t2.join();
    int k;
    std::cin >> k;
}

标签: c++multithreading

解决方案


std::vector<int>不是线程安全的数据结构。因此,如果您同时从两个线程访问它,那将被视为未定义的行为。你可能会崩溃,有其他问题,或者它可能看起来有效(但仍然是错误的)。

向量内的整数,以及代表你位置的整数也不是线程安全的——读/写不一定是原子的(有无锁的方法可以做到这一点)。

所以,你可以完全实现类似这种无锁的东西,但不是这样。这里的一些信息:https ://www.infoq.com/news/2014/10/cpp-lock-free-programming/

通常,您想查看 std::atomic 中的原语:https ://en.cppreference.com/w/cpp/atomic/atomic

另请参阅:具有原子索引的环形缓冲区


推荐阅读