首页 > 解决方案 > boost::async 读写在一次读取和两次写入后卡住

问题描述

我试图构建一个服务器客户端应用程序,服务器端是带有 boost::asio 异步的 c++,但是当我运行该应用程序时,它在 1 次读取和两次写入后卡住了。这是我删除do_write函数调用时的相关代码我收到了我发送的所有 5 条消息

编辑 当我使用调试器运行时,我能够接收所有五条消息并发送所有响应

void start()
  {
    std::thread t1([this](){do_write();});
    t1.detach();
    std::thread t2([this](){do_read();});
    t2.detach();
  }

void do_write()
  {
    auto self(shared_from_this());
    Message msg;
    while(order_response_queue_.empty())
    {}
    auto order_response = order_response_queue_.front();
    order_response_queue_.pop();
    try {
      auto m = order_response.SerializeToString();
      msg.body_length(std::strlen(m.c_str()));
      std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
      msg.encode_header();
      std::memcpy(data_,msg.data(), msg.length());
      //std::memcpy(static_cast<void *>(msg.data()), data_, msg.length());
    } catch (std::exception& e )
    {
      std::cout << e.what();
    }
    std::cout <<"write: " << msg.body() << "\n";
    boost::asio::async_write(socket_, boost::asio::buffer(data_,msg.length()),
                   [this, self](boost::system::error_code ec, std::size_t /*length*/)
                   {
                      if (ec){
                       std::cerr << "write error:" << ec.value() << " message: " << ec.message() << "\n";
                      }
                      do_write();
                    });
 }


void do_read()
  {
    auto self(shared_from_this());
    Message msg;
    socket_.async_read_some(boost::asio::buffer(res.data(), res.header_length),
                            [this, self](boost::system::error_code ec, std::size_t length) {
                            if (!ec && res.decode_header()) {

                            std::string st(res.body());
                            boost::asio::async_read(socket_, boost::asio::buffer(res.body(),                                                                                      res.body_length()), [this](boost::system::error_code ec, std::size_t length) {
                       if (!ec) {
                           std::cout << "read " << res.body() << "\n";
                           req_.DeserializeFromChar(res.body());
                           order_request_queue_.push(req_);
                        } else {
                           if (ec) {
                               std::cerr << "read error:" << ec.value() << " message: "
                                                                        << ec.message() << "\n";
                           }
                           socket_.close();
                         }
                  });
        }
        do_read();
    });
 }

这里是io_service

class Server
{
 public:
 
  Server(boost::asio::io_service& io_service, short port,std::queue<OrderRequest> &order_request_queue,
         std::queue<OrderResponse> &order_response_queue)
      : acceptor_(io_service, tcp::endpoint(tcp::v4(), port)),
        socket_(io_service) , order_response_queue_(order_response_queue),  order_request_queue_(order_request_queue)
  {
    do_accept();
  }

 private:
  void do_accept(){
    acceptor_.async_accept(socket_,
                           [this](boost::system::error_code ec) {
                             if (!ec) {
                               std::cout << "accept connection\n";
                               std::make_shared<Session>(std::move(socket_),order_request_queue_, order_response_queue_)->start();
                             }

                             do_accept();
                           });
  }


  tcp::acceptor acceptor_;
  tcp::socket socket_;
  std::queue<OrderResponse> &order_response_queue_;
  std::queue<OrderRequest> &order_request_queue_;
};

标签: c++boost-asio

解决方案


您应该使样本独立。

  • 什么是Message
  • 从哪里来data_
  • 从哪里来res_
  • 为什么data_msg.length(),
  • 这是否包括标题长度,
  • req_和之间的关系是什么res
  • 这些的寿命是多少。
  • 你为什么使用线程异步
  • 正在运行的执行上下文在哪里,等等。

这是您给我们的评论:

  • 您正在使用order_response_queue_线程,没有任何锁定。这真的不行,因为do_write线程上没有任何东西会推送到该队列上,所以其他一些线程必须这样做。

老实说,我认为最好的起点是删除线程。您似乎使用它们的主要原因是:

  • “保持循环运行”,这已经是异步执行上下文的一个特性。但是,您已经从完成处理程序链接(例如do_write再次调用),所以没关系

  • 能够“等待”队列中的消息:

     while (order_response_queue_.empty()) {
     }
    

    通常的方法是在推送第一条消息时有条件地启动循环,并在队列为空时让它“停止”。无论如何,这避免了忙碌的旋转。只需start用类似的东西替换

     void start() {
         post(strand_, [this, self = shared_from_this()] {
             if (!order_response_queue_.empty()) {
                 do_write();
             }
    
             do_read();
         });
     }
    

    并有类似的东西

     void enqueue_response(Message stuff) {
         post(strand_, [this, stuff, self = shared_from_this()] {
             order_response_queue_.push(std::move(stuff));
             if (1 == order_response_queue_.size()) { // not idle/already writing
                 do_write();
             }
         });
     }
    

    请注意,使用共享队列的每个操作都在逻辑链上同步,以防您使用多线程执行上下文。

原始草图

我不太清楚与 Message 相关的 Serialize/Deserialize 背后的想法,所以这可能不起作用 - 尽管我觉得它会给你很多关于如何简化事情的线索:

实时编译器资源管理器*

#include <thread>
#include <boost/asio.hpp>
#include <iostream>
#include <memory>
#include <queue>
using boost::asio::ip::tcp;

struct Message {
    struct header_t {
        // TODO add magic bytes
        std::uint32_t body_len = 0; // network byte order
    };
    enum { header_length = sizeof(header_t) };
    std::string _contents{header_length};

    size_t body_length() const {
        return length() - header_length;
    }

    void body_length(size_t n) {
        _contents.resize(n+header_length);
        encode_header();
    }

    bool decode_header() {
        assert(_contents.length() >= header_length);

        auto &header = *reinterpret_cast<header_t *>(_contents.data());
        body_length(ntohl(header.body_len));

        // TODO verify magic bytes
        return true;
    }

    void encode_header() {
        assert(_contents.length() >= header_length);

        auto &header = *reinterpret_cast<header_t *>(_contents.data());
        header.body_len = htonl(_contents.length() - header_length);
    }

    std::string SerializeToString() const { return _contents; }
    void DeserializeFromChar(char const*) { } // TODO IMPLEMENTATION

    char *data()             { return _contents.data();       } 
    char *body()             { return data() + header_length; } 
    char const *data() const { return _contents.data();       } 
    char const *body() const { return data() + header_length; } 
    size_t length() const    { return _contents.size();       } 

    bool operator<(Message const &rhs) const { return _contents < rhs._contents; }
};

struct X : std::enable_shared_from_this<X> {
    std::queue<Message> order_request_queue_, order_response_queue_;

    void start() {
        post(strand_, [this, self = shared_from_this()] {
            if (!order_response_queue_.empty()) {
                do_write();
            }

            do_read();
        });
    }

    void enqueue_response(Message stuff) {
        post(strand_, [this, stuff, self = shared_from_this()] {
            order_response_queue_.push(std::move(stuff));
            if (1 == order_response_queue_.size()) { // not idle/already writing
                do_write();
            }
        });
    }

    void do_write() {
        auto self(shared_from_this());
        auto order_response = std::move(order_response_queue_.front());
        order_response_queue_.pop();

        Message msg;
        try {
            auto m = order_response.SerializeToString();
            msg.body_length(std::strlen(m.c_str()));
            std::memcpy(msg.body(), m.c_str(), std::strlen(m.c_str()));
            msg.encode_header();

            assert(msg.length() <= data_.size());
            std::memcpy(data_.data(), msg.data(), msg.length());
            // std::memcpy(static_cast<void *>(msg.data()), data_,
            // msg.length());
        } catch (std::exception &e) {
            std::cout << e.what();
        }

        std::cout << "write: " << msg.body() << "\n";

        boost::asio::async_write(
            socket_, boost::asio::buffer(data_, msg.length()),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) {
                if (ec) {
                    std::cerr << "write error:" << ec.value()
                              << " message: " << ec.message() << "\n";
                } else if (!order_response_queue_.empty()) {
                    do_write();
                }
            });
    }

    Message res, req_;
    void do_read() {
        auto self(shared_from_this());
        Message msg;
        socket_.async_read_some(
            boost::asio::buffer(res.data(), res.header_length),
            [this, self](boost::system::error_code ec, std::size_t /*length*/) {
                if (!ec && res.decode_header()) {

                    std::string st(res.body());
                    boost::asio::async_read(
                        socket_,
                        boost::asio::buffer(res.body(), res.body_length()),
                        [this](boost::system::error_code ec,
                               std::size_t /*length*/) {
                            if (!ec) {
                                std::cout << "read " << res.body() << "\n";
                                req_.DeserializeFromChar(res.body());
                                order_request_queue_.push(req_);
                            } else {
                                if (ec) {
                                    std::cerr << "read error:" << ec.value()
                                              << " message: " << ec.message()
                                              << "\n";
                                }
                                socket_.close();
                            }
                        });
                }
                do_read();
            });
    }

    using Executor = boost::asio::system_executor;
    boost::asio::strand<Executor> strand_ { Executor{} };
    tcp::socket socket_ { strand_ };
    std::array<char, 1024> data_;

    // quick & dirty connection
    X() { socket_.connect({{}, 8989}); }
};

int main() {
    auto x = std::make_shared<X>();
    x->start();

    {
        Message demo;
        std::string_view greeting{"Hello world!"};
        demo.body_length(greeting.length());
        std::copy(greeting.begin(), greeting.end(), demo.body());

        x->enqueue_response({});
    }

    boost::asio::query(boost::asio::system_executor(),
                       boost::asio::execution::context)
        .join();
}

推荐阅读