首页 > 解决方案 > 如何将 boost io_service 与优先级队列一起使用?

问题描述

我有一个有两个功能的程序。一个是循环计时器,另一个是接收一些套接字。

我发现,如果在计时器触发之前有多个包进入,boost 将运行所有套接字句柄,然后运行计时器句柄。

我写了一个简单的代码来模拟这个时间,如下所示:

#include <iostream>
#include <memory>

#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>

std::string get_time()
{
    struct timespec time_spec;
    clock_gettime(CLOCK_REALTIME, &time_spec);
    int h  = (int)(time_spec.tv_sec / 60 / 60 % 24);
    int m  = (int)(time_spec.tv_sec / 60 % 60);
    int s  = (int)(time_spec.tv_sec % 60);
    int ms = (int)(time_spec.tv_nsec / 1000);
    char st[50];
    snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);

    return std::string(st);
}

void fA()
{
  std::cout << get_time() << " : fA()" << std::endl;
  boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}

void fB()
{
  std::cout << get_time() << " : fB()" << std::endl;
  boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}

int main(int argc, char *argv[])
{
    boost::asio::io_service io;
    std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);

    std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
    std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);

    std::cout << get_time() << " : start" << std::endl;

    t100ms->expires_from_now(std::chrono::milliseconds(100));
    t80ms->expires_from_now(std::chrono::milliseconds(80));

    t100ms->async_wait([&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t100ms" << std::endl;
        }
    });
    t80ms->async_wait([&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t80ms" << std::endl;
            io.post(fA);
            io.post(fB);
        }
    });

    io.run();

    return 0;
}

这段代码的结果是:

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms

但是,我想要的结果是:

[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()

t100ms 可以在 fA 和 fB 之间运行,该时间08:15:40:582721在开始后 100ms 处更接近正确的所需时间 []。

我找到了一个调用示例,它给出了优先级队列的示例。

并尝试通过将我的代码添加到此示例中来修改它。

    ...

    timer.async_wait(pri_queue.wrap(42, middle_priority_handler));


    std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
    std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);

    std::cout << get_time() << " : start" << std::endl;

    t100ms->expires_from_now(std::chrono::milliseconds(100));
    t80ms->expires_from_now(std::chrono::milliseconds(80));

    t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t100ms" << std::endl;
        }
    }));
    t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
        if(_error.value() == boost::system::errc::errc_t::success) {
            std::cout << get_time() << " : t80ms" << std::endl;
            io_service.post(pri_queue.wrap(0, fA));
            io_service.post(pri_queue.wrap(0, fB));
        }
    }));

    while (io_service.run_one())

    ...

但是,结果仍然没有显示在我的脑海中。它如下所示:

[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms

我哪里错了?

标签: boost

解决方案


处理程序按照它们发布的顺序运行。

当 80 毫秒到期时,您会立即同时发布fA()fB()。当然,它们将首先运行,因为 t100ms 仍在等待中。

这是您的示例,但已大大简化:

住在科利鲁

#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;

namespace {
    static auto now = std::chrono::system_clock::now;
    static auto get_time = [start = now()]{
        return "at " + std::to_string((now() - start)/1ms) + "ms:\t";
    };

    void message(std::string msg) {
        std::cout << (get_time() + msg + "\n") << std::flush; // minimize mixing output from threads
    }

    auto make_task = [](auto name, auto duration) {
        return [=] {
            message(name);
            std::this_thread::sleep_for(duration);
        };
    };
}

int main() {
    io_context io;

    message("start");

    steady_timer t100ms(io, 100ms);
    t100ms.async_wait([&](auto ec) {
        message("t100ms " + ec.message());
    });

    steady_timer t80ms(io, 80ms);
    t80ms.async_wait([&](auto ec) {
        message("t80ms " + ec.message());
        post(io, make_task("task A", 40ms));
        post(io, make_task("task B", 20ms));
    });

    io.run();
}

印刷

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 140ms:       t100ms Success

一种方法

假设您真的想为操作计时,请考虑运行多个线程。有了这个三个字的变化,输出是:

at 1ms: start
at 81ms:    t80ms Success
at 81ms:    task A
at 82ms:    task B
at 101ms:   t100ms Success

要仍然序列化 A 和 B,请通过更改将它们发布在链上:

post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));

auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));

现在打印

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B

(下面的完整列表)。

没有线程请

当您不想使用线程时(例如为了简单/安全),另一种方法确实需要一个队列。我会考虑简单地写出来:

struct Queue {
    template <typename Ctx>
    Queue(Ctx context) : strand(make_strand(context)) {}

    void add(Task f) {
        post(strand, [this, f=std::move(f)] {
            if (tasks.empty())
                run();
            tasks.push_back(std::move(f));
        });
    }

  private:
    boost::asio::any_io_executor strand;
    std::deque<Task> tasks;

    void run() {
        post(strand, [this] { drain_loop(); });
    }

    void drain_loop() {
        if (tasks.empty()) {
            message("queue empty");
        } else {
            tasks.front()(); // invoke task
            tasks.pop_front();
            run();
        }
    }
};

现在我们可以安全地选择是否要在线程上下文中使用它——因为所有队列操作都在一个链上。

int main() {
    thread_pool io; // or io_context io;
    Queue tasks(io.get_executor());

    message("start");

    steady_timer t100ms(io, 100ms);
    t100ms.async_wait([&](auto ec) {
        message("t100ms " + ec.message());
    });

    steady_timer t80ms(io, 80ms);
    t80ms.async_wait([&](auto ec) {
        message("t80ms " + ec.message());
        tasks.add(make_task("task A", 40ms));
        tasks.add(make_task("task B", 40ms));
    });

    io.join(); // or io.run()
}

使用thread_pool io;

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 100ms:       t100ms Success
at 120ms:       task B
at 160ms:       queue empty

使用io_context io;(或thread_pool io(1);当然):

at 0ms: start
at 80ms:        t80ms Success
at 80ms:        task A
at 120ms:       task B
at 160ms:       t100ms Success
at 160ms:       queue empty

推荐阅读