首页 > 解决方案 > 线程不退出?

问题描述

我正在尝试编写一个程序来解决 C++ 中线程的生产者消费者问题,并且据我所知,该程序可以正常工作,直到线程应该使用 join() 函数退出。(Product 对象是一个简单的数据容器)。

    #include <iostream>
    #include <random>
    #include <cstdlib>
    #include <ctime>
    #include <chrono>
    #include <sstream>
    #include <vector>
    #include <stack>
    #include <thread>
    #include <mutex>
    #include <atomic>
    #include <condition_variable>
    #include <Product.h>
    
    using namespace std;
    
    const int max_items = 100;
    atomic<int> itemNum(0);
    atomic<int> numProducersWorking(0);
    stack<Product> items;
    int maxBuffer;
    float storeSales[10];
    float monthSales[12];
    float totalSales;
    mutex xmutex;
    condition_variable isNotFull;
    condition_variable isNotEmpty;
    
    int intRand(const int & min, const int & max) {
        static thread_local mt19937 generator(time(0));
        uniform_int_distribution<int> distribution(min,max);
        return distribution(generator);
    }
    
    float floatRand(const float & min, const float & max) {
        static thread_local mt19937 generator(time(0));
        uniform_real_distribution<float> distribution(min,max);
        return distribution(generator);
    }
    
    void produce(int pId)
    {
        unique_lock<mutex> lock(xmutex);
        int day, month, year, id, regNum;
        float saleAmnt;
        Product item;
    
        id = pId;
        day = intRand(1, 30);
        month = intRand(1, 12);
        year = 20;
        regNum = intRand(1, 6);
        saleAmnt = floatRand(0.50, 999.99);
    
        item = Product(day, month, year, id, regNum, saleAmnt);
    
        isNotFull.wait(lock, [] { return items.size() != maxBuffer; });
        if(itemNum < max_items)
        {
            items.push(item);
            itemNum++;
        }
    
        isNotEmpty.notify_all();
    }
    
    void consume(int cId)
    {
        unique_lock<mutex> lock(xmutex);
        Product item;
    
        isNotEmpty.wait(lock, [] { return items.size() > 0; });
        item = items.top();
        items.pop();
        storeSales[item.getStoreID()-1] += item.getSaleAmnt();
        monthSales[item.getMonth()-1] += item.getSaleAmnt();
        totalSales += item.getSaleAmnt();
    
        isNotFull.notify_all();
    }
    
    void producer(int id)
    {
        ++numProducersWorking;
        while(itemNum < max_items)
        {
            produce(id);
            this_thread::sleep_for(chrono::milliseconds(intRand(5, 40)));
        }
        --numProducersWorking;
    }
    
    void consumer(int id)
    {
    
        while(numProducersWorking != 0 || items.size() > 0 )
            {
                consume(id);
            }
    
    }
    
    
    int main()
    {
        int p, c, b;
    
        p = 5;
        c = 5;
        b = 5;
    
        maxBuffer = b;
    
        vector<thread> prodsCons;
    
        auto start = chrono::high_resolution_clock::now();
    
        //create producers
        for(int i = 1; i <= p; i++)
        {
            prodsCons.push_back(thread(producer, i));
        }
    
         //create consumers
        for(int i = 0; i < c; i++)
        {
            prodsCons.push_back(thread(consumer, i));
        }
    
        int x = 0;
        //wait for consumers and producers to finish
        for(auto& th : prodsCons)
        {
            th.join();
            cout<<"thread "<<x<<" joined"<<endl;
            x++;
        }
    
        auto stop = chrono::high_resolution_clock::now();
        auto duration = chrono::duration_cast<chrono::microseconds>(stop - start);
    
        cout<<"Store-wide total sales: "<<endl;
        for(int x = 1; x <= p; x++)
        {
            cout<<"  store "<<x<<" sales: $"<<storeSales[x-1]<<endl;
        }
        cout<<"Month-wise total sales: "<<endl;
        for(int x = 1; x <= 12; x++)
        {
            cout<<"  month "<<x<<" sales: $"<<monthSales[x-1]<<endl;
        }
        cout<<"Total sales: $"<<totalSales<<endl;
        cout<<"Simulation time: "<<duration.count()<<" microseconds"<<endl;
    }

输出如下所示:

thread 0 joined
thread 1 joined
thread 2 joined
thread 3 joined
thread 4 joined

表明 10 个线程中有 5 个没有退出(很可能是消费者),因此程序永远不会结束。是否存在未满足的条件,或者我是否错误地实现了互斥锁?

标签: c++multithreadingmutexproducer-consumer

解决方案


一旦消费线程到达内部的 condition_variable::wait 调用consume(),它不会在没有某种信号的情况下返回。

我通常有一个关闭标志,它由与队列相同的互斥锁保护,我的等待条件将基于关闭标志和大小。

当消费者停止时,我获取互斥锁,并设置关闭标志。然后,在退出等待时,我将在关机时立即退出,或者仅在队列也为空时退出。前者是立即关闭,而后者是工作完成后关闭。

此外,对项目堆栈的所有访问都必须受到互斥锁的保护。您在某些地方已经这样做了,但在其他地方却没有。


推荐阅读