首页 > 解决方案 > POSIX 部分写入、线程安全和锁定

问题描述

即使_write是线程安全的,它也不能保证完全写入,可能会发生部分写入。

如果两个线程写入同一个文件描述符,有没有办法只阻塞文件描述符,而不是使用全局互斥锁来阻塞整个函数?

因此,如果两个线程正在尝试写入fd 1,则一个必须等​​待另一个完成;如果一个正在尝试写入fd 1而另一个正在尝试写入fd 2,那么它们将同时被执行。

我正在寻找 C++ 解决方案。

#include <io.h>

struct IOError {};

void
write(int const fd, char const * buffer, int unsigned size) {
    int result;
    while (size != 0) {
        result = ::_write(fd, buffer, size);
        if (result < 0) {
            throw IOError();
        }
        buffer += result;
        size -= result;
    }
}

int
main() {
    write(1, "Hello, world!\n", 14);
    return 0;
}

标签: c++multithreadinglockingposixfile-descriptor

解决方案


一个简单的解决方案是为每个文件描述符使用一个互斥锁。

您只需要一个全局互斥锁即可为给定描述符创建一个唯一互斥锁并将其存储到映射中,只要您至少可以使用 C++11 进行编译(例如本地静态线程安全

但是您需要将地图创建/地图搜索的结果存储到某些东西中(因为 STL 容器本身不是线程安全的)。我在这里使用共享指针来提供服务。他们会自动删除。

如果您想使用异常,则std::lock_guard< std::mutex > RAII 可帮助您在发生不良情况时释放互斥锁(请参阅 QA,例如Unlock mutex on exception

您可以在此处复制/粘贴(面向 Linux 的)代码。只需将 NB_ELEM 调整为高于系统管道尺寸的值。

#include <unistd.h>
#include <mutex>
#include <map>
#include <memory>

#include <future> // For async and testing
#include <vector> // For testing
#include <iostream> // here for testing std::cout 
#include <fcntl.h> // For testing fcntl to display pipe (std out) size

void my_write(int const fd, char const * buffer, ssize_t size) 
{

    static std::map<int,std::shared_ptr<std::mutex>> MM;
    static std::mutex global_mutex;
    
    ssize_t result;
    std::shared_ptr<std::mutex> msptr;
    
    {
        std::lock_guard<std::mutex> lock(global_mutex);
        if ( MM.cend() == MM.find(fd) ) {
            msptr = std::make_shared<std::mutex>();
            MM[fd] = msptr;
        }   
        else {
            msptr = MM[fd];
        }
    }
    
    std::lock_guard<std::mutex> lock(*msptr);

    while (size != 0) {
        result = write(fd, buffer, size);
        if (result < 0) {
            //throw if you want
        }
        buffer += result;
        size -= result;
    }
    
}

const size_t NB_ELEM = 100000u;

std::vector<char> va(NB_ELEM,'a');

std::vector<char> vb(NB_ELEM,'b');


int
main() 
{
    va.push_back('\n');
    vb.push_back('\n');
    
    std::cout << "stdout pipe size is : " << fcntl( 1, F_GETPIPE_SZ ) << "\n" << std::flush;
    
    {
    #if 1
        auto fut2 = std::async([](){my_write(1, vb.data(), vb.size());});
        auto fut1 = std::async([](){my_write(1, va.data(), va.size());});
    #else
        auto fut2 = std::async([](){write(1, vb.data(), vb.size());});
        auto fut1 = std::async([](){write(1, va.data(), va.size());});
    #endif
    }
    
    std::cout << "Bye ! \n" << std::flush;
    
    return 0;
}

关于大肠杆菌系统

标准输出管道尺寸为:65536

使用 my_write(...) 你会得到这样的结果

bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb...

啊啊啊啊啊啊啊啊啊啊啊啊……

再见 !

并且使用正常的 write(...) 你有时可能会得到

bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb...bbbaaaaaaaaa...aaaaaaaaabbb...

bbbbbbbbbbbbbbbbbbbbbaaaaaaaa....aaa

再见 !


推荐阅读