boost - 如何将 boost io_service 与优先级队列一起使用?
问题描述
我有一个有两个功能的程序。一个是循环计时器,另一个是接收一些套接字。
我发现,如果在计时器触发之前有多个包进入,boost 将运行所有套接字句柄,然后运行计时器句柄。
我写了一个简单的代码来模拟这个时间,如下所示:
#include <iostream>
#include <memory>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/asio/steady_timer.hpp>
std::string get_time()
{
struct timespec time_spec;
clock_gettime(CLOCK_REALTIME, &time_spec);
int h = (int)(time_spec.tv_sec / 60 / 60 % 24);
int m = (int)(time_spec.tv_sec / 60 % 60);
int s = (int)(time_spec.tv_sec % 60);
int ms = (int)(time_spec.tv_nsec / 1000);
char st[50];
snprintf(st, 50, "[%02d:%02d:%02d:%06d]", h, m, s, ms);
return std::string(st);
}
void fA()
{
std::cout << get_time() << " : fA()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(40));
}
void fB()
{
std::cout << get_time() << " : fB()" << std::endl;
boost::this_thread::sleep(boost::posix_time::milliseconds(20));
}
int main(int argc, char *argv[])
{
boost::asio::io_service io;
std::shared_ptr<boost::asio::io_service::work> work = std::make_shared<boost::asio::io_service::work>(io);
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
});
t80ms->async_wait([&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io.post(fA);
io.post(fB);
}
});
io.run();
return 0;
}
这段代码的结果是:
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : fB()
[08:15:40:623186] : t100ms
但是,我想要的结果是:
[08:15:40:482721] : start
[08:15:40:562867] : t80ms
[08:15:40:562925] : fA()
[08:15:40:603037] : t100ms
[08:15:40:604037] : fB()
t100ms 可以在 fA 和 fB 之间运行,该时间08:15:40:582721
在开始后 100ms 处更接近正确的所需时间 []。
我找到了一个调用示例,它给出了优先级队列的示例。
并尝试通过将我的代码添加到此示例中来修改它。
...
timer.async_wait(pri_queue.wrap(42, middle_priority_handler));
std::shared_ptr<boost::asio::steady_timer> t100ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::shared_ptr<boost::asio::steady_timer> t80ms = std::make_shared<boost::asio::steady_timer>(io_service);
std::cout << get_time() << " : start" << std::endl;
t100ms->expires_from_now(std::chrono::milliseconds(100));
t80ms->expires_from_now(std::chrono::milliseconds(80));
t100ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t100ms" << std::endl;
}
}));
t80ms->async_wait(pri_queue.wrap(100, [&](const boost::system::error_code &_error) {
if(_error.value() == boost::system::errc::errc_t::success) {
std::cout << get_time() << " : t80ms" << std::endl;
io_service.post(pri_queue.wrap(0, fA));
io_service.post(pri_queue.wrap(0, fB));
}
}));
while (io_service.run_one())
...
但是,结果仍然没有显示在我的脑海中。它如下所示:
[08:30:13:868299] : start
High priority handler
Middle priority handler
Low priority handler
[08:30:13:948437] : t80ms
[08:30:13:948496] : fA()
[08:30:13:988606] : fB()
[08:30:14:008774] : t100ms
我哪里错了?
解决方案
处理程序按照它们发布的顺序运行。
当 80 毫秒到期时,您会立即同时发布fA()
和fB()
。当然,它们将首先运行,因为 t100ms 仍在等待中。
这是您的示例,但已大大简化:
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
using boost::asio::io_context;
using boost::asio::steady_timer;
using namespace std::chrono_literals;
namespace {
static auto now = std::chrono::system_clock::now;
static auto get_time = [start = now()]{
return "at " + std::to_string((now() - start)/1ms) + "ms:\t";
};
void message(std::string msg) {
std::cout << (get_time() + msg + "\n") << std::flush; // minimize mixing output from threads
}
auto make_task = [](auto name, auto duration) {
return [=] {
message(name);
std::this_thread::sleep_for(duration);
};
};
}
int main() {
io_context io;
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
});
io.run();
}
印刷
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 140ms: t100ms Success
一种方法
假设您真的想为操作计时,请考虑运行多个线程。有了这个三个字的变化,输出是:
at 1ms: start
at 81ms: t80ms Success
at 81ms: task A
at 82ms: task B
at 101ms: t100ms Success
要仍然序列化 A 和 B,请通过更改将它们发布在链上:
post(io, make_task("task A", 40ms));
post(io, make_task("task B", 20ms));
至
auto s = make_strand(io);
post(s, make_task("task A", 40ms));
post(s, make_task("task B", 20ms));
现在打印
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
(下面的完整列表)。
没有线程请
当您不想使用线程时(例如为了简单/安全),另一种方法确实需要一个队列。我会考虑简单地写出来:
struct Queue {
template <typename Ctx>
Queue(Ctx context) : strand(make_strand(context)) {}
void add(Task f) {
post(strand, [this, f=std::move(f)] {
if (tasks.empty())
run();
tasks.push_back(std::move(f));
});
}
private:
boost::asio::any_io_executor strand;
std::deque<Task> tasks;
void run() {
post(strand, [this] { drain_loop(); });
}
void drain_loop() {
if (tasks.empty()) {
message("queue empty");
} else {
tasks.front()(); // invoke task
tasks.pop_front();
run();
}
}
};
现在我们可以安全地选择是否要在线程上下文中使用它——因为所有队列操作都在一个链上。
int main() {
thread_pool io; // or io_context io;
Queue tasks(io.get_executor());
message("start");
steady_timer t100ms(io, 100ms);
t100ms.async_wait([&](auto ec) {
message("t100ms " + ec.message());
});
steady_timer t80ms(io, 80ms);
t80ms.async_wait([&](auto ec) {
message("t80ms " + ec.message());
tasks.add(make_task("task A", 40ms));
tasks.add(make_task("task B", 40ms));
});
io.join(); // or io.run()
}
使用thread_pool io;
:
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 100ms: t100ms Success
at 120ms: task B
at 160ms: queue empty
使用io_context io;
(或thread_pool io(1);
当然):
at 0ms: start
at 80ms: t80ms Success
at 80ms: task A
at 120ms: task B
at 160ms: t100ms Success
at 160ms: queue empty
推荐阅读
- python - 使用 FilePond(React 组件)将图像上传到 Flask 服务器
- vue.js - 提高 Vue.js 应用程序的性能
- permissions - 无法通过 Azure Dev Ops 代理共享文件夹
- r - 从 R 中的图像 URL 异步下载图像
- html - 使用节点 js 在托管服务器上的 html 文件中自动填充某些参数
- django - Django中的聚合
- python - Platform.system() 给我'str object has no attribute'system'
- javascript - 在点击链接之前,如何将用户返回到他所在页面的上一个位置?
- python - 为什么这段代码不会删除我在 Python 中的反斜杠?
- python-2.7 - 无法更新更改 GPIO 引脚的图像