首页 > 解决方案 > 从共享缓冲区写入文件丢失数据并且程序在没有 cout 的情况下崩溃

问题描述

我正在使用线程和共享缓冲区制作程序。两个线程在后台无限期地运行,一个线程将用数据填充共享缓冲区,另一个线程将共享缓冲区的内容写入文件。

用户可以启动或停止数据填充,导致线程进入等待状态,直到用户再次启动线程。每个循环缓冲区填充 50 个浮点数。

这是代码:


#include <iostream>
#include <vector>
#include <iterator>
#include <utility>
#include <fstream>
#include <condition_variable>
#include <mutex>
#include <thread>

using namespace std;

std::mutex m;
std::condition_variable cv;
std::vector<std::vector<float>> datas;
bool keep_running = true, start_running = false;

void writing_thread()
{
    ofstream myfile;

    bool opn = false;

    while(1)
    {

        while(keep_running)
        {
            // Open the file only once
            if(!opn)
            {
                myfile.open("IQ_Datas.txt");
                opn = true;

            }


            // Wait until main() sends data
            std::unique_lock<std::mutex> lk(m);

            cv.wait(lk, [] {return !datas.empty();});


            auto d = std::move(datas);


            lk.unlock();


            for(auto &entry : d)
            {
                for(auto &e : entry)
                    myfile << e << endl;
            }


        }

        if(opn)
        {
            myfile.close();
            opn = false;
        }

    }
}

void sending_thread()
{

    std::vector<float> m_buffer;
    int cpt=0;
    //Fill the buffer with 50 floats
    for(float i=0; i<50; i++)
        m_buffer.push_back(i);

    while(1)
    {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {return keep_running && start_running;});

        }
        while(keep_running)
        {

            //Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;

            cout << "in3" << endl; //Commenting this line makes the program crash

            {
                std::lock_guard<std::mutex> lk(m);
                if (!keep_running)break;
                datas.push_back(std::move(d));
            }
            cv.notify_one();
            cpt++;
        }

        cout << "Total data: " << cpt*50 << endl;
        cpt = 0;
    }
}
void start()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = true;
    }
    cv.notify_all();
}
void stop()
{
    {
        std::unique_lock<std::mutex> lk(m);
        start_running = false;
    }
    cv.notify_all();
}

int main()
{
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    t1.detach();
    t2.detach();

    while(1)
    {

        std::cin >> go;

        if(go == 1)
        {
            start();
            keep_running = true;
        }
        else if(go == 0)
        {
            stop();
            keep_running = false;
        }


    }

    return 0;
}


我对这段代码有 2 个问题:

为什么使用 cout 程序运行正确?是什么导致数据丢失?是因为sending_thread填充缓冲区太快而writing_thread写入文件需要太多时间吗?

编辑:一些精度,添加更多 coutsending_thread似乎可以解决所有问题。第一个线程产生了 2100 万个浮点数,第二个线程成功写入了 2100 万个浮点数。似乎没有 cout,生产者线程工作得太快,消费者线程无法在将数据写入文件时不断从共享缓冲区中检索数据。

标签: c++multithreadingfilecrash

解决方案


避免:

Moved-from object 'datas' of type 'std::vector' is moved:
        auto d = std::move(datas);
                 ^~~~~~~~~~~~~~~~

替换这个:

        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, [] {return !datas.empty();});
        auto d = std::move(datas);
        lk.unlock();

有了这个:

        // Wait until main() sends data            
        std::vector<std::vector<float>> d;
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] { return !datas.empty(); });
            datas.swap(d);
        }

bool还将从多个线程访问的变量替换为std::atomic_boolor std::atomic_flag

来自bad_alloc比它sending_thread快得多,writing_thread所以它会耗尽内存。当您放慢速度sending_thread(使用打印)时,问题就不那么明显了,但是您应该进行一些同步才能正确完成。您可以围绕它创建一个包装器类并提供插入和提取方法以确保所有访问都正确同步并为其提供最大数量的元素。一个例子:

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}

    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements && m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        if(m_shutdown == false) {
            m_current_elements = 0;
            return_value.swap(m_data);
        } else {
            // return an empty vector if we should shutdown
        }
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

如果您想在关机后继续提取数据,您可以更改extract_all()为:

   std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

一个完整的示例可能如下所示:

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <fstream>
#include <iostream>
#include <iterator>
#include <mutex>
#include <thread>
#include <utility>
#include <vector>

using namespace std;

template<typename T>
class atomic2dvector {
public:
    atomic2dvector(size_t max_elements) : m_max_elements(max_elements) {}
    atomic2dvector(const atomic2dvector&) = delete;
    atomic2dvector(atomic2dvector&&) = delete;
    atomic2dvector& operator=(const atomic2dvector&) = delete;
    atomic2dvector& operator=(atomic2dvector&&) = delete;

    ~atomic2dvector() { shutdown(); }

    bool insert_one(std::vector<T>&& other) {
        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_current_elements + m_data.size() > m_max_elements &&
              m_shutdown == false)
            m_cv.wait(lock);
        if(m_shutdown) return false;

        m_current_elements += other.size();
        m_data.emplace_back(std::forward<std::vector<T>>(other));

        m_cv.notify_one();
        return true;
    }
    std::vector<std::vector<T>> extract_all() {
        std::vector<std::vector<T>> return_value;

        std::unique_lock<std::mutex> lock(m_mtx);
        while(m_data.empty() && m_shutdown == false) m_cv.wait(lock);

        m_current_elements = 0;
        return_value.swap(m_data);
        m_cv.notify_one();

        return return_value;
    }

    bool is_active() const { return m_shutdown == false; }

    void shutdown() {
        m_shutdown = true;
        m_cv.notify_all();
    }

private:
    size_t m_max_elements;
    size_t m_current_elements = 0;
    std::atomic<bool> m_shutdown = false;
    std::condition_variable m_cv{};
    std::mutex m_mtx{};
    std::vector<std::vector<T>> m_data{};
};

std::mutex m;
std::condition_variable cv;
atomic2dvector<float> datas(256 * 1024 * 1024 / sizeof(float)); // 0.25 GiB limit
std::atomic_bool start_running = false;

void writing_thread() {
    std::ofstream myfile("IQ_Datas.txt");
    if(myfile) {
        std::cout << "writing_thread waiting\n";

        std::vector<std::vector<float>> d;
        while((d = datas.extract_all()).empty() == false) {
            std::cout << "got " << d.size() << "\n";

            for(auto& entry : d) {
                for(auto& e : entry) myfile << e << "\n";
            }
            std::cout << "wrote " << d.size() << "\n\n";
        }
    }
    std::cout << "writing_thread shutting down\n";
}

void sending_thread() {
    std::vector<float> m_buffer;
    std::uintmax_t cpt = 0;
    // Fill the buffer with 50 floats
    for(float i = 0; i < 50; i++) m_buffer.push_back(i);

    while(true) {
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, [] {
                return start_running == true || datas.is_active() == false;
            });
        }
        if(datas.is_active() == false) break;
        std::cout << "sending...\n";
        while(start_running == true) {
            // Each loop d is containing 50 floats
            std::vector<float> d = m_buffer;
            if(datas.insert_one(std::move(d)) == false) break;
            cpt++;
        }
        cout << "Total data: " << cpt * 50 << endl;
        cpt = 0;
    }
    std::cout << "sending_thread shutting down\n";
}

void start() {
    std::unique_lock<std::mutex> lk(m);
    start_running = true;
    cv.notify_all();
}
void stop() {
    std::unique_lock<std::mutex> lk(m);
    start_running = false;
    cv.notify_all();
}
void quit() {
    datas.shutdown();
    cv.notify_all();
}

int main() {
    int go = 0;
    thread t1(sending_thread);
    thread t2(writing_thread);

    std::this_thread::sleep_for(std::chrono::milliseconds(100));
    std::cout << "Enter 1 to make the sending thread send and 0 to make it stop "
                 "sending. Enter a non-integer to shutdown.\n";

    while(std::cin >> go) {
        if(go == 1) {
            start();
        } else if(go == 0) {
            stop();
        }
    }
    std::cout << "--- shutting down ---\n";
    quit();

    std::cout << "joining threads\n";
    t1.join();
    std::cout << "t1 joined\n";
    t2.join();
    std::cout << "t2 joined\n";
}

推荐阅读