c++ - boost::async 读写在一次读取和两次写入后卡住
问题描述
我试图构建一个服务器客户端应用程序,服务器端是带有 boost::asio 异步的 c++,但是当我运行该应用程序时,它在 1 次读取和两次写入后卡住了。这是我删除do_write
函数调用时的相关代码我收到了我发送的所有 5 条消息
编辑 当我使用调试器运行时,我能够接收所有五条消息并发送所有响应
void start()
{
std::thread t1([this](){do_write();});
t1.detach();
std::thread t2([this](){do_read();});
t2.detach();
}
void do_write()
{
auto self(shared_from_this());
Message msg;
while(order_response_queue_.empty())
{}
auto order_response = order_response_queue_.front();
order_response_queue_.pop();
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
std::memcpy(data_,msg.data(), msg.length());
//std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
} catch (std::exception& e )
{
std::cout << e.what();
}
std::cout <<"write: " << msg.body() << "\n";
boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/)
{
if (ec){
std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
}
do_write();
});
}
void do_read()
{
auto self(shared_from_this());
Message msg;
socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t length) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(socket_, boost::asio::buffer(res.body(), res.body_length()), [this](boost::system::error_code ec, std::size_t length) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value() << " message: "
<< ec.message() << "\n";
}
socket_.close();
}
});
}
do_read();
});
}
这里是io_service
class Server
{
public:
Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
std::queue<OrderResponse> &order_response_queue)
: acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
socket_(io_service) , order_response_queue_(order_response_queue), order_request_queue_(order_request_queue)
{
do_accept();
}
private:
void do_accept(){
acceptor_.async_accept(socket_,
[this](boost::system::error_code ec) {
if (!ec) {
std::cout << "accept connection\n";
std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
}
do_accept();
});
}
tcp::acceptor acceptor_;
tcp::socket socket_;
std::queue<OrderResponse> &order_response_queue_;
std::queue<OrderRequest> &order_request_queue_;
};
解决方案
您应该使样本独立。
- 什么是
Message
, - 从哪里来
data_
, - 从哪里来
res
_ - 为什么
data_
与msg.length()
, - 这是否包括标题长度,
req_
和之间的关系是什么res
- 这些的寿命是多少。
- 你为什么使用线程异步
- 正在运行的执行上下文在哪里,等等。
这是您给我们的评论:
- 您正在使用
order_response_queue_
线程,没有任何锁定。这真的不行,因为do_write
线程上没有任何东西会推送到该队列上,所以其他一些线程必须这样做。
老实说,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:
“保持循环运行”,这已经是异步执行上下文的一个特性。但是,您已经从完成处理程序链接(例如
do_write
再次调用),所以没关系能够“等待”队列中的消息:
while (order_response_queue_.empty()) { }
通常的方法是在推送第一条消息时有条件地启动循环,并在队列为空时让它“停止”。无论如何,这避免了忙碌的旋转。只需
start
用类似的东西替换void start() { post(strand_, [this, self = shared_from_this()] { if (!order_response_queue_.empty()) { do_write(); } do_read(); }); }
并有类似的东西
void enqueue_response(Message stuff) { post(strand_, [this, stuff, self = shared_from_this()] { order_response_queue_.push(std::move(stuff)); if (1 == order_response_queue_.size()) { // not idle/already writing do_write(); } }); }
请注意,使用共享队列的每个操作都在逻辑链上同步,以防您使用多线程执行上下文。
原始草图
我不太清楚与 Message 相关的 Serialize/Deserialize 背后的想法,所以这可能不起作用 - 尽管我觉得它会给你很多关于如何简化事情的线索:
#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;
struct Message {
struct header_t {
// TODO add magic bytes
std::uint32_t body_len = 0; // network byte order
};
enum { header_length = sizeof(header_t) };
std::string _contents{header_length};
size_t body_length() const {
return length() - header_length;
}
void body_length(size_t n) {
_contents.resize(n+header_length);
encode_header();
}
bool decode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
body_length(ntohl(header.body_len));
// TODO verify magic bytes
return true;
}
void encode_header() {
assert(_contents.length() >= header_length);
auto &header = *reinterpret_cast<header_t *>(_contents.data());
header.body_len = htonl(_contents.length() - header_length);
}
std::string SerializeToString() const { return _contents; }
void DeserializeFromChar(char const*) { } // TODO IMPLEMENTATION
char *data() { return _contents.data(); }
char *body() { return data() + header_length; }
char const *data() const { return _contents.data(); }
char const *body() const { return data() + header_length; }
size_t length() const { return _contents.size(); }
bool operator<(Message const &rhs) const { return _contents < rhs._contents; }
};
struct X : std::enable_shared_from_this<X> {
std::queue<Message> order_request_queue_, order_response_queue_;
void start() {
post(strand_, [this, self = shared_from_this()] {
if (!order_response_queue_.empty()) {
do_write();
}
do_read();
});
}
void enqueue_response(Message stuff) {
post(strand_, [this, stuff, self = shared_from_this()] {
order_response_queue_.push(std::move(stuff));
if (1 == order_response_queue_.size()) { // not idle/already writing
do_write();
}
});
}
void do_write() {
auto self(shared_from_this());
auto order_response = std::move(order_response_queue_.front());
order_response_queue_.pop();
Message msg;
try {
auto m = order_response.SerializeToString();
msg.body_length(std::strlen(m.c_str()));
std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
msg.encode_header();
assert(msg.length() <= data_.size());
std::memcpy(data_.data(), msg.data(), msg.length());
// std::memcpy(static_cast<void *>(msg.data()), data_,
// msg.length());
} catch (std::exception &e) {
std::cout << e.what();
}
std::cout << "write: " << msg.body() << "\n";
boost::asio::async_write(
socket_, boost::asio::buffer(data_, msg.length()),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (ec) {
std::cerr << "write error:" << ec.value()
<< " message: " << ec.message() << "\n";
} else if (!order_response_queue_.empty()) {
do_write();
}
});
}
Message res, req_;
void do_read() {
auto self(shared_from_this());
Message msg;
socket_.async_read_some(
boost::asio::buffer(res.data(), res.header_length),
[this, self](boost::system::error_code ec, std::size_t /*length*/) {
if (!ec && res.decode_header()) {
std::string st(res.body());
boost::asio::async_read(
socket_,
boost::asio::buffer(res.body(), res.body_length()),
[this](boost::system::error_code ec,
std::size_t /*length*/) {
if (!ec) {
std::cout << "read " << res.body() << "\n";
req_.DeserializeFromChar(res.body());
order_request_queue_.push(req_);
} else {
if (ec) {
std::cerr << "read error:" << ec.value()
<< " message: " << ec.message()
<< "\n";
}
socket_.close();
}
});
}
do_read();
});
}
using Executor = boost::asio::system_executor;
boost::asio::strand<Executor> strand_ { Executor{} };
tcp::socket socket_ { strand_ };
std::array<char, 1024> data_;
// quick & dirty connection
X() { socket_.connect({{}, 8989}); }
};
int main() {
auto x = std::make_shared<X>();
x->start();
{
Message demo;
std::string_view greeting{"Hello world!"};
demo.body_length(greeting.length());
std::copy(greeting.begin(), greeting.end(), demo.body());
x->enqueue_response({});
}
boost::asio::query(boost::asio::system_executor(),
boost::asio::execution::context)
.join();
}
推荐阅读
- php - URI 的控制器不可调用 symfony2
- r - 如何在 ggplot2 中增加轴标签和图例大小?
- machine-learning - 如何训练 NER 识别单词不是实体?
- c# - HttpContext 在具有 OmniSharp 扩展的 Visual Studio 代码中的工作方式不同
- python - 使用特定方法对嵌套的对象列表进行排序
- javascript - 在nodejs中读取tif图像元数据
- oracle-ebs - FND_STADARD_DATE
- ios - UITableViewCell 选项视图重叠交互
- c# - Windows 不接受自签名 SSL 证书
- python - 如何将属性从 SVG 复制到另一个?