asynchronous - Why is there not a "cursor" for caller to async_send_some with ConstBufferSequence buffers?
问题描述
In boost asio is there async_write_some
for programmer to implement their own fine-grained logic similar to async_write
in any fashion, etc:
class koebenhavn : public std::enable_shared_from_this<koebenhavn>
{
private:
boost::asio::ip::tcp::socket socket;
public:
...
void writing(std::vector<const_buffer> & buffers, boost::asio::io_context::strand & strand){
/* io worker may call back at arbitrary amount of transferred bytes */
socket->async_write_some(buffers,
boost::asio::bind_executor(strand, [self=shared_from_this(), &buffers, &strand](boost::system::error_code error, size_t transferred){
/* lambda instead of std::bind for specification if any possible overload */
self->wrote(buffers, strand, error);
}));
}
void wrote(std::vector<const_buffer> & buffers,
boost::asio::io_context::strand & strand,
boost::system::error_code error, size_t transferred){
/* we need to advance buffers to send the rest bytes in reference */
if(!error){
/* transfer occurred */
if(transferred){
/* ----------------> here we need to construct new "cursor" of buffers based on the previous */
std::vector<const_buffer> cursor = advance(buffers, transferred);
/* <---------------- repeat! */
writing(cursor, strand);
}
}
}
...
};
What I want is a simple function to advance the sequence of buffers for the next call to repeat. Now I suspect there isn't any present wheel for that in asio, is there?
There IS a solid reason not to use async_write in my case. Please leave it out if reply.
解决方案
你所期望的存在。它在DynamicBuffer概念中。
read|wrte_some
成员函数是低级的,不采用动态缓冲区(它采用 MutableBuffer 序列)。事实上,我认为使用这些功能并不是 99% 的时间你想要的。例如看到这个评论:
评论
读取操作可能不会读取所有请求的字节数。如果需要确保在异步操作完成之前读取请求的数据量,请考虑使用 async_read 函数
人们经常对 IO 数据包传递有错误的假设。只需使用该async_read
函数,与您的动态缓冲区:
boost::asio::streambuf
boost::asio::dynamic_buffer
带有std::string
变量等
自己做
当然,您可以将动态缓冲区(即具有读/写“光标”)与任何低级接口(期望直接缓冲区)一起使用。但是随后您必须使用prepare()
//接口来管理游标consume()
(commit()
以及可能在底层存储中发生的可选分配)。
您可以将其与iostreams
: 进行比较,而不是int x; std::cin >> x;
您可以非常多地使用底层的streambuf接口,但这只会增加工作量并且容易出错。
当然,有时您需要那个低级接口(例如,当您希望能够更详细地了解哪些数据包在什么时间到达时),但我认为这是规则的例外。
奖励:演示代码
对问题代码的最简单修复是使用组合写入操作:
void writing(std::vector<const_buffer> const& buffers, strand& strand)
{
/* io worker may call back at arbitrary amount of transferred bytes */
boost::asio::async_write(
socket, buffers,
bind_executor(strand, [self = shared_from_this(), strand]
(error_code ec, size_t transferred) {
std::cout << "Transferred " << transferred << " bytes "
<< "(" << ec.message() << ")" << std::endl;
}));
}
这里的“游标”是由图书馆完成的。无话可问。
笔记:
您可能更接受缓冲区类型,接受 DynamicBuffer (v1/v2) 和 ConstBufferSequence 的任何模型:
template <typename Buffers> void writing(Buffers&& buffers, strand& strand) { /* io worker may call back at arbitrary amount of transferred bytes */ boost::asio::async_write( socket, std::forward<Buffers>(buffers), bind_executor(strand, [self = shared_from_this(), strand] (error_code ec, size_t transferred) { std::cout << "Transferred " << transferred << " bytes " << "(" << ec.message() << ")" << std::endl; })); }
您可以将执行程序与 IO 对象相关联,例如:
koebenhavn(boost::asio::io_context& ctx) : socket_(make_strand(ctx)) { }
现在,当您在该套接字上启动任何异步操作时,您可以使用它,
(boost::asio::associated_executor(socket_)
或者默认socket_.get_executor()
情况下让库执行此操作。template <typename Buffers> void writing(Buffers&& buffers) { /* io worker may call back at arbitrary amount of transferred bytes */ async_write( socket, std::forward<Buffers>(buffers), [self = shared_from_this()](error_code ec, size_t transferred) { std::cout << "Transferred " << transferred << " bytes " << "(" << ec.message() << ")" << std::endl; }); }
注意到事情变得简单了吗?是时候把它复杂化了。
writing(...)
您的代码具有将被安全调用的默认假设,即从链中调用。由于我们不确定,请考虑发布或发送到链:dispatch( // socket_.get_executor(), [this, self, b = std::forward_as_tuple(buffers)]() mutable { async_write( // socket_, std::get<0>(b), [self](error_code ec, size_t transferred) { std::cout << "Transferred " << transferred << " bytes " << "(" << ec.message() << ")" << std::endl; }); });
Using
dispatch
的优点是,如果库检测到您已经在链上,它可能能够立即运行任务。
现场演示
#include <boost/asio.hpp>
#include <iostream>
class koebenhavn : public std::enable_shared_from_this<koebenhavn> {
private:
boost::asio::ip::tcp::socket socket_;
using const_buffer = boost::asio::const_buffer;
using error_code = boost::system::error_code;
using strand = boost::asio::io_context::strand;
public:
koebenhavn(boost::asio::io_context& ctx)
: socket_(make_strand(ctx))
{ }
// io worker may call back at arbitrary amount of transferred bytes
template <typename Buffers> void writing(Buffers&& buffers)
{
auto self = shared_from_this();
dispatch( //
socket_.get_executor(),
[this, self, b = std::forward_as_tuple(buffers)]() mutable {
async_write( //
socket_, std::get<0>(b),
[self](error_code ec, size_t transferred) {
std::cout << "Transferred " << transferred << " bytes "
<< "(" << ec.message() << ")" << std::endl;
});
});
}
};
int main() {
boost::asio::io_context io;
auto k = std::make_shared<koebenhavn>(io);
boost::asio::streambuf sb;
k->writing(sb);
std::string s = "hello world\n";
k->writing(boost::asio::buffer(s));
std::array<float, 7> ff{};
std::vector<unsigned char> bb{{0, 1, 2, 3, 4}};
k->writing(std::vector{
boost::asio::buffer(ff),
boost::asio::buffer(bb),
});
}
推荐阅读
- ms-office - Office.CoercionType.Image 的可用性
- node.js - Electron i32 app 找不到 sqlite3 模块
- php - 为什么查询无法使用 mysqli 选择特定 id?
- python - 如何转换 RGB 数组以模拟色盲
- c++ - 将多个整数存储到数组中并打印出来
- android-studio - 片段中的抽屉布局
- javascript - 带括号的正则表达式测试
- javascript - Async 和 await 函数在其他函数之前运行
- c - CRC 实现一个特定的多项式。多项式与代码中使用的多项式有何关系?
- opendaylight - ODL 配置 netconf 服务器失败