首页 > 解决方案 > 使用协程从 epoll_wait 处理事件的多线程

问题描述

我有这样的 io 上下文类:

    class IOContext
    {
    public:
        explicit IOContext(uint32_t eventPoolCount);
        ~IOContext();

        IOContext(IOContext&) = delete;
        IOContext& operator=(IOContext&) = delete;
        IOContext(IOContext&&) = delete;
        IOContext& operator=(IOContext&&) = delete;

        void processAwaitingEvents(int timeout);

        void scheduleOperation(std::derived_from<IOOperation> auto& operation)
        {
            // ok?
            if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_ADD, operation.fd, &operation.settings))
                return;

            if(errno == EEXIST)
                // re-init event
                if(0 == epoll_ctl(this->epollFD, EPOLL_CTL_MOD, operation.fd, &operation.settings))
                    return;

            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

    private:
        tinycoro::Generator<IOOperation::CoroHandle> yieldAwaitingEvents(int timeout);

        std::unique_ptr<epoll_event[]> eventsList;
        const uint32_t eventPoolCount;
        int epollFD = -1;
    };

    //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(auto& event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount; ++i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }

我想为io事件启用多线程处理(IOOperation是基类,其中包含fd),所以当一些事件准备好时,首先从N个线程中获取并处理它们。所以,我认为我可以将 eventsList 作为 thread_local 变量移动到 cpp,如下所示:

    //.
    //. //constructor, destructor
    //.
    void IOContext::processAwaitingEvents(int timeout)
    {
        for(auto& event : this->yieldAwaitingEvents(timeout))
        {
            event.resume();
        }
    }

    tinycoro::Generator<IOOperation::CoroHandle> IOContext::yieldAwaitingEvents(int timeout)
    {
        thread_local std::unique_ptr<epoll_event[]> eventsList = std::make_unique<epoll_event[]>(this->eventPollCount);
        int eventCount = epoll_wait(this->epollFD, eventsList.get(), this->eventPoolCount, timeout);
        if(eventCount == -1)
        {
            throw std::system_error{errno, std::system_category(), strerror(errno)};
        }

        for(int i = 0; i < eventCount; ++i)
        {
            co_yield std::coroutine_handle<IOOperation>::from_address(eventsList[i].data.ptr);
        }
    }

但我不确定这是个好主意,即这个解决方案是否安全(我做了一些测试,一切都很好,但尽管如此,我还是有疑问)。
我可以将 yieldAwaitingEvents 作为一个公共函数,但我不认为提供原始协程句柄是个好主意......
谢谢!

编辑:我更改了一些代码 - thread_local 变量的初始化已移至 yieldAwaitingEvents 函数

标签: c++coroutineepollc++-coroutine

解决方案


推荐阅读