首页 > 解决方案 > Why is there not a "cursor" for caller to async_send_some with ConstBufferSequence buffers?

问题描述

In boost asio is there async_write_some for programmer to implement their own fine-grained logic similar to async_write in any fashion, etc:

class koebenhavn : public std::enable_shared_from_this<koebenhavn>
{
private:
    boost::asio::ip::tcp::socket socket;
public:
    ...
    void writing(std::vector<const_buffer> & buffers, boost::asio::io_context::strand & strand){
        /* io worker may call back at arbitrary amount of transferred bytes */
        socket->async_write_some(buffers, 
             boost::asio::bind_executor(strand, [self=shared_from_this(), &buffers, &strand](boost::system::error_code error, size_t transferred){
                 /* lambda instead of std::bind for specification if any possible overload */
                 self->wrote(buffers, strand, error);
             }));
    }
    void wrote(std::vector<const_buffer> & buffers, 
               boost::asio::io_context::strand & strand,
               boost::system::error_code error, size_t transferred){
               /* we need to advance buffers to send the rest bytes in reference */
               if(!error){
                 /* transfer occurred */
                 if(transferred){
  /* ----------------> here we need to construct new "cursor" of buffers based on the previous */
                   std::vector<const_buffer> cursor = advance(buffers, transferred);
  /* <---------------- repeat! */
                   writing(cursor, strand);
                 }
               }
    }
    ...
};

What I want is a simple function to advance the sequence of buffers for the next call to repeat. Now I suspect there isn't any present wheel for that in asio, is there?

There IS a solid reason not to use async_write in my case. Please leave it out if reply.

标签: asynchronousboostasio

解决方案


你所期望的存在。它在DynamicBuffer概念中。

read|wrte_some成员函数是低级的,不采用动态缓冲区(它采用 MutableBuffer 序列)。事实上,我认为使用这些功能并不是 99% 的时间你想要的。例如看到这个评论

评论

读取操作可能不会读取所有请求的字节数。如果需要确保在异步操作完成之前读取请求的数据量,请考虑使用 async_read 函数

人们经常对 IO 数据包传递有错误的假设。只需使用该async_read函数,与您的动态缓冲区:

自己做

当然,您可以将动态缓冲区(即具有读/写“光标”)与任何低级接口(期望直接缓冲区)一起使用。但是随后您必须使用prepare()//接口来管理游标consume()commit()以及可能在底层存储中发生的可选分配)。

您可以将其与iostreams: 进行比较,而不是int x; std::cin >> x;您可以非常多地使用底层的streambuf接口,但这只会增加工作量并且容易出错。

当然,有时您需要那个低级接口(例如,当您希望能够更详细地了解哪些数据包在什么时间到达时),但我认为这是规则的例外。

奖励:演示代码

对问题代码的最简单修复是使用组合写入操作:

void writing(std::vector<const_buffer> const& buffers, strand& strand)
{
    /* io worker may call back at arbitrary amount of transferred bytes */
    boost::asio::async_write(
        socket, buffers,
        bind_executor(strand, [self = shared_from_this(), strand]
            (error_code ec, size_t transferred) {
                std::cout << "Transferred " << transferred << " bytes "
                          << "(" << ec.message() << ")" << std::endl;
            }));
}

这里的“游标”是由图书馆完成的。无话可问。

笔记:

  1. 您可能更接受缓冲区类型,接受 DynamicBuffer (v1/v2) 和 ConstBufferSequence 的任何模型:

    template <typename Buffers>
    void writing(Buffers&& buffers, strand& strand)
    {
        /* io worker may call back at arbitrary amount of transferred bytes */
        boost::asio::async_write(
            socket, std::forward<Buffers>(buffers),
            bind_executor(strand, [self = shared_from_this(), strand]
                (error_code ec, size_t transferred) {
                    std::cout << "Transferred " << transferred << " bytes "
                              << "(" << ec.message() << ")" << std::endl;
                }));
    }
    
  2. 您可以将执行程序与 IO 对象相关联,例如:

    koebenhavn(boost::asio::io_context& ctx)
        : socket_(make_strand(ctx))
    { }
    

    现在,当您在该套接字上启动任何异步操作时,您可以使用它,(boost::asio::associated_executor(socket_)或者默认socket_.get_executor() 情况下让库执行此操作。

    template <typename Buffers>
        void writing(Buffers&& buffers)
    {
        /* io worker may call back at arbitrary amount of transferred bytes */
        async_write(
            socket, std::forward<Buffers>(buffers),
            [self = shared_from_this()](error_code ec, size_t transferred) {
                std::cout << "Transferred " << transferred << " bytes "
                          << "(" << ec.message() << ")" << std::endl;
            });
    }
    
  3. 注意到事情变得简单了吗?是时候把它复杂化了。writing(...)您的代码具有将被安全调用的默认假设,即从链中调用。由于我们不确定,请考虑发布或发送到链:

    dispatch( //
        socket_.get_executor(),
        [this, self, b = std::forward_as_tuple(buffers)]() mutable {
            async_write( //
                socket_, std::get<0>(b),
                [self](error_code ec, size_t transferred) {
                    std::cout << "Transferred " << transferred << " bytes "
                              << "(" << ec.message() << ")" << std::endl;
                });
        });
    

    Usingdispatch的优点是,如果库检测到您已经在链上,它可能能够立即运行任务。

现场演示

住在科利鲁

#include <boost/asio.hpp>
#include <iostream>

class koebenhavn : public std::enable_shared_from_this<koebenhavn> {
  private:
    boost::asio::ip::tcp::socket socket_;
    using const_buffer = boost::asio::const_buffer;
    using error_code   = boost::system::error_code;
    using strand       = boost::asio::io_context::strand;

  public:
    koebenhavn(boost::asio::io_context& ctx)
        : socket_(make_strand(ctx))
    { }

    // io worker may call back at arbitrary amount of transferred bytes
    template <typename Buffers> void writing(Buffers&& buffers)
    {
        auto self = shared_from_this();
        dispatch( //
            socket_.get_executor(),
            [this, self, b = std::forward_as_tuple(buffers)]() mutable {
                async_write( //
                    socket_, std::get<0>(b),
                    [self](error_code ec, size_t transferred) {
                        std::cout << "Transferred " << transferred << " bytes "
                                  << "(" << ec.message() << ")" << std::endl;
                    });
            });
    }
};

int main() {
    boost::asio::io_context io;
    auto k = std::make_shared<koebenhavn>(io);

    boost::asio::streambuf sb;

    k->writing(sb);

    std::string s = "hello world\n";
    k->writing(boost::asio::buffer(s));

    std::array<float, 7>       ff{};
    std::vector<unsigned char> bb{{0, 1, 2, 3, 4}};

    k->writing(std::vector{
        boost::asio::buffer(ff),
        boost::asio::buffer(bb),
    });
}

推荐阅读