首页 > 解决方案 > 如何使用 boost::beast,下载无阻塞且有响应的文件

问题描述

我已经从这个例子开始,所以不会发布所有代码。我的目标是下载一个大文件而不阻塞我的主线程。第二个目标是获取通知,以便我可以更新进度条。我确实有几种方式工作的代码。首先是ioc.run();让它开始工作,我下载了文件。但是我无论如何都找不到在没有阻塞的情况下启动会话。

第二种方式我可以拨打电话http::async_read_some并且通话有效,但我无法得到我可以使用的响应。我不知道是否有办法传递捕获的 lambda。

#if 0..#else..#endif切换方法。我敢肯定有一个简单的方法,但我就是看不到它。当我开始工作时,我会清理代码,比如设置本地文件名。谢谢。

    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        if (ec);//deal with it... 
        if (!bValidConnection) {
            std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
            auto pos = view.find("Content-Length:");
            if (pos == std::string_view::npos)
                ;//error
            file_size = std::stoi(view.substr(pos+sizeof("Content-Length:")).data());
            if (!file_size)
                ;//error
            bValidConnection = true;
        }
        else {
            file_pos += bytes_transferred;
            response_call(ec, file_pos);
        }
#if 0
        std::cout << "in on_read_some caller\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            response_call,
            std::placeholders::_1,
            std::placeholders::_2));
#else
        std::cout << "in on_read_some inner\n";
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
#endif
        return buffer_.size();
    }

主要的,凌乱的,但......

struct lambda_type {
    bool bDone = false;
    void operator ()(const boost::system::error_code ec, std::size_t bytes_transferred) {
        ;
    }
};
int main(int argc, char** argv)
{
    auto const host = "reserveanalyst.com";
    auto const port = "443";
    auto const target = "/downloads/demo.msi";
    int version = argc == 5 && !std::strcmp("1.0", argv[4]) ? 10 : 11;

    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };

    load_root_certificates(ctx);
    //ctx.load_verify_file("ca.pem");

    auto so = std::make_shared<session>(ioc, ctx);
    so->run(host, port, target, version);

    bool bDone = false;
    auto const lambda = [](const boost::system::error_code ec, std::size_t bytes_transferred) {
        std::cout << "data lambda bytes: " << bytes_transferred << " er: " << ec.message() << std::endl;
    };

    lambda_type lambda2;
    so->set_response_call(lambda);
    ioc.run();

    std::cout << "not in ioc.run()!!!!!!!!" << std::endl;

    so->async_read_some(lambda);

    //pseudo message pump when working.........
    for (;;) {
        std::this_thread::sleep_for(250ms);
        std::cout << "time" << std::endl;
    }
    return EXIT_SUCCESS;
}

还有我添加到class session

class session : public std::enable_shared_from_this<session>
{
        using response_call_type = void(*)(boost::system::error_code ec, std::size_t bytes_transferred);
        http::response_parser<http::file_body> file_parser_;
        response_call_type response_call;
        //
        bool bValidConnection = false;
        std::size_t file_pos = 0;
        std::size_t file_size = 0;
    
    public:
        auto& get_result() { return res_; }
        auto& get_buffer() { return buffer_; }
        void set_response_call(response_call_type the_call) { response_call = the_call; }

标签: c++boostasiobeast

解决方案


这就是我最终想出的用于带有消息泵的应用程序。我正在为我的应用程序使用 MFC。对于任何像我一样喜欢 asio 的人来说,这是一个必看的视频。

CppCon 2016 Michael Caisse 与 BoostAsio 的异步 IO

有几种方法可以运行它。有一个打开非阻塞的定义。这适用于下载大文件并显示带有取消按钮的进度对话框的情况。启用取消按钮设置bool quit为true。评论#define NO_BLOCKING以在消息泵等待时下载一个小文件。

我认为我使用的方式std::thread reader_thread;在这个应用程序中是合适的。我一次不会下载多个文件。我已经开始将它插入我的应用程序,一切看起来都很好。

至于传递 lambda 的问题,@Yakk - Adam Nevraumont 非常有帮助。在这里阅读他的答案使关于使用 lambda 捕获的事情变得更加清晰。

如果与 libcripto 和 libssl 的链接被删除,则此代码应该可以正常编译和运行。我正在使用 libcripto-3 这是root_certificates.hpp. 我查了一下,这个版本运行良好。

完整的代码。

// Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
// Official repository: https://github.com/boostorg/beast
// Example: HTTP SSL client, asynchronous downloads

#include <boost/beast/core.hpp>
#include <boost/beast/http.hpp>
#include <boost/beast/version.hpp>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/ssl/error.hpp>
#include <boost/asio/ssl/stream.hpp>
#include <cstdlib>
#include <functional>
#include <iostream>
#include <fstream>
#include <memory>
#include <string>
#include <chrono>
#include <thread>

//don't need the cert in a file method or use 
#include "root_certificates.hpp"
#pragma comment(lib, "C:\\cpp\\openssl-master\\libcrypto.lib")
#pragma comment(lib, "C:\\cpp\\openssl-master\\libssl.lib")

using tcp = boost::asio::ip::tcp;       // from <boost/asio/ip/tcp.hpp>
namespace ssl = boost::asio::ssl;       // from <boost/asio/ssl.hpp>
namespace http = boost::beast::http;    // from <boost/beast/http.hpp>

void session_fail(boost::system::error_code ec, char const* what){
    std::cerr << what << ": " << ec.message() << "\n";
}

class session : public std::enable_shared_from_this<session>
{
public:
    enum responses {
        resp_null,
        resp_ok,
        resp_done,
    };
    using response_call_type = std::function< void(responses, std::size_t)>;
protected:
    tcp::resolver resolver_;
    ssl::stream<tcp::socket> stream_;
    boost::beast::flat_buffer buffer_; // (Must persist between reads)
    http::request<http::empty_body> req_;
    http::response<http::string_body> res_;
    boost::beast::http::request_parser<boost::beast::http::string_body> header_parser_;
    http::response_parser<http::file_body> file_parser_;
    response_call_type response_call;
    boost::system::error_code file_open_ec;
    //
    std::size_t file_pos = 0;
    std::size_t file_size = 0;

public:
    explicit session(boost::asio::io_context& ioc, ssl::context& ctx, const char* filename)
        : resolver_(ioc)
        , stream_(ioc, ctx)
    {
        file_parser_.body_limit((std::numeric_limits<std::uint64_t>::max)());       
        file_parser_.get().body().open(filename, boost::beast::file_mode::write, file_open_ec);
    }
    void run(char const* host, char const* port, char const* target, int version)
    {
        std::cout << "run" << std::endl;
        if (!SSL_set_tlsext_host_name(stream_.native_handle(), host))
        {
            boost::system::error_code ec{ static_cast<int>(::ERR_get_error()), boost::asio::error::get_ssl_category() };
            std::cerr << ec.message() << "\n";
            return;
        }
        // Set up an HTTP GET request message
        req_.version(version);
        req_.method(http::verb::get);
        req_.target(target);
        req_.set(http::field::host, host);
        req_.set(http::field::user_agent, BOOST_BEAST_VERSION_STRING);

        // Look up the domain name
        resolver_.async_resolve(host, port, std::bind(
            &session::on_resolve,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_resolve( boost::system::error_code ec, tcp::resolver::results_type results)
    {
        std::cout << "on_resolve" << std::endl;
        if (ec)
            return session_fail(ec, "resolve");

        // Make the connection on the IP address we get from a lookup
        boost::asio::async_connect( stream_.next_layer(), results.begin(), results.end(), std::bind(
                &session::on_connect,
                shared_from_this(),
                std::placeholders::_1));
    }
    void on_connect(boost::system::error_code ec)
    {
        std::cout << "on_connect" << std::endl;
        if (ec)
            return session_fail(ec, "connect");

        // Perform the SSL handshake
        stream_.async_handshake( ssl::stream_base::client, std::bind(
            &session::on_handshake,
            shared_from_this(),
            std::placeholders::_1));
    }
    void on_handshake(boost::system::error_code ec)
    {
        std::cout << "on_handshake" << std::endl;
        if (ec)
            return session_fail(ec, "handshake");

        // Send the HTTP request to the remote host
        http::async_write(stream_, req_, std::bind(
            &session::on_write,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
    }
    void on_write(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_write" << std::endl;
        if (ec)
            return session_fail(ec, "write");
        if (response_call)
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_startup,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        else
            http::async_read_header(stream_, buffer_, file_parser_, std::bind(
                &session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
    }
    std::size_t on_startup(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        std::cout << "on_startup: " << bytes_transferred << std::endl;
        std::string_view view((const char*)buffer_.data().data(), bytes_transferred);
        auto pos = view.find("Content-Length:");
        if (pos == std::string_view::npos)
            ;//error
        file_size = std::stoi(view.substr(pos + sizeof("Content-Length:")).data());
        if (!file_size)
            ;//error
        std::cout << "filesize: " << file_size << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read_some(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        //std::cout << "on_read_some" << std::endl;
        if (ec) {
            session_fail(ec, "on_read_some");
            return 0;
        }
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
        response_call(resp_ok, file_pos);

        //std::cout << "session::on_read_some: " << file_pos << std::endl;
        http::async_read_some(stream_, buffer_, file_parser_, std::bind(
            &session::on_read_some,
            shared_from_this(),
            std::placeholders::_1,
            std::placeholders::_2));
        return buffer_.size();
    }
    std::size_t on_read(boost::system::error_code ec, std::size_t bytes_transferred)
    {
        file_pos += bytes_transferred;
        if (!bytes_transferred && file_pos) {
            on_shutdown(ec);
            return 0;
        }
            std::cout << "on_read: " << bytes_transferred << std::endl;
        http::async_read(stream_, buffer_, file_parser_,
            std::bind(&session::on_read,
                shared_from_this(),
                std::placeholders::_1,
                std::placeholders::_2));
        return buffer_.size();
    }
    void on_shutdown(boost::system::error_code ec)
    {
        std::cout << "on_shutdown" << std::endl;
        if (ec == boost::asio::error::eof) {
            // Rationale:
            // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
            ec.assign(0, ec.category());
        }
        if (response_call)
            response_call(resp_done, 0);
        if (ec)
            return session_fail(ec, "shutdown");
    }
    auto get_file_status() const { return file_open_ec; }
    void set_response_call(response_call_type the_call) { response_call = the_call; }
    std::size_t get_download_size() const { return file_size; }
};

#define NO_BLOCKING
int main(int argc, char** argv)
{
    //in a UI app you will need to keep a persistant thread/pool;
    std::thread reader_thread;
    //for an application where this never changes, this can just be put in the session class
    auto const host = "reserveanalyst.com";
    auto const port = "443";
#ifdef NO_BLOCKING // the large file
    auto const target = "/afi.zip";
#else // the small file
    auto const target = "/directive.xml";
#endif
    boost::asio::io_context ioc;
    ssl::context ctx{ ssl::context::sslv23_client };
    load_root_certificates(ctx);
    //end, put in the session class
    auto so = std::make_shared<session>(ioc, ctx, "content.txt");
    so->run(host, port, target, 11);//so->run(target);
    //
    session::responses glb_response;
    bool test_bool = false; //stand in for 'SendMessage' values
    std::size_t buf_size = 0; //stand in for 'SendMessage' values
#ifdef NO_BLOCKING
    auto static const lambda = [&glb_response,&buf_size](session::responses response, std::size_t bytes_transferred) {
        glb_response = response;
        buf_size = bytes_transferred;
    };
    so->set_response_call(lambda);
#else
    ioc.run();
    std::cout << "ioc run exited" << std::endl;
#endif

#ifdef NO_BLOCKING
    reader_thread.swap(std::thread{ [&ioc]() { ioc.run(); } });
#endif
    bool quit = false; //true: as if a cancel button was pushed; won't finish download
    //pseudo message pump
    for (int i = 0; ;++i) {
        
        switch(glb_response){ //ad hoc as if messaged
        case session::responses::resp_ok:
            std::cout << "from sendmessage: " << buf_size << std::endl;
            break;
        case session::responses::resp_done:
            std::cout << "from sendmessage: done" << std::endl;
        }//switch
        glb_response = session::responses::resp_null;
        if (!(i % 10))
            std::cout << "in message pump, stopped: " << std::boolalpha << ioc.stopped() << std::endl;

        std::this_thread::sleep_for(std::chrono::milliseconds(200));
        if (quit && i == 10) //the cancel message
                ioc.stop();
        if (ioc.stopped())//just quit to test join.
            break;
    }
    if(reader_thread.joinable())//in the case a thread was never started
        reader_thread.join();
    std::cout << "exiting, program was quit" << std::endl;
    return EXIT_SUCCESS;
}

推荐阅读