timer - 如何使用 Tokio 生成许多可取消的计时器?
问题描述
如何使用 Tokio 实现固定数量的定时器,这些定时器会在线程间定期重置和取消?当计时器到期时,将执行回调。
一个类似于 Go 的 APItime.AfterFunc
基本上是我想要的:
package main
import (
"fmt"
"time"
)
func main() {
t := time.AfterFunc(time.Hour, func() {
// happens every 2 seconds with 1 second delay
fmt.Println("fired")
})
for {
t.Reset(time.Second)
time.Sleep(time.Second * 2)
}
}
我发现实现(足够)类似API的唯一板条箱是计时器,它以非常幼稚的方式通过产生2个线程来实现。当定时器经常被重置时,这很快就会变得令人望而却步。
显而易见的答案是使用 Tokio,问题是如何优雅地做到这一点。
一种选择是在每次更新计时器时生成一个新的绿色线程,并使用原子取消前一个计时器,方法是在这个原子上调节回调的执行,例如这个伪 Rust:
tokio::run({
// for every timer spawn with a new "cancel" atomic
tokio::spawn({
Delay::new(Instant::now() + Duration::from_millis(1000))
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(|_| {
if !cancelled.load(Ordering::Acquire) {
println!("fired");
}
Ok(())
})
})
})
问题是我维护已经取消的计时器的状态,可能持续几分钟。此外,它看起来并不优雅。
此外tokio::time::Delay
,tokio::time::DelayQueue
似乎也适用。特别是,通过Key
从“插入”返回的引用定时器来重置和取消定时器的能力。
目前还不清楚如何在多线程应用程序中使用这个库,即:
返回值表示插入,并用于删除和重置的参数。请注意,Key 是令牌,一旦从队列中删除值,则可以重用,方法是在到达时调用 poll 或调用 remove。此时,调用者必须注意不要再次使用返回的 Key,因为它可能会引用队列中的不同项目。
这将在通过其键取消计时器的任务与从流中消耗计时器事件的任务之间创建竞争条件DelayQueue
- 导致恐慌或取消不相关的计时器。
解决方案
您可以将Select
futures-rs 中的组合器与 Tokio 一起使用。它返回第一个完成的未来的结果,然后忽略/停止轮询另一个。
作为第二个未来,我们可以使用一个接收器oneshot::channel
来创建一个信号来完成我们的组合器未来。
use futures::sync::oneshot;
use futures::*;
use std::thread;
use std::time::{Duration, Instant};
use tokio::timer::Delay;
fn main() {
let (interrupter, interrupt_handler) = oneshot::channel::<()>();
//signal to cancel delayed call
thread::spawn(move || {
thread::sleep(Duration::from_millis(500)); //increase this value more than 1000ms to see is delayed call is working or not.
interrupter
.send(())
.expect("Not able to cancel delayed future");
});
let delayed = Delay::new(Instant::now() + Duration::from_millis(1000))
.map_err(|e| panic!("timer failed; err={:?}", e))
.and_then(|_| {
println!("Delayed Call Executed!");
Ok(())
});
tokio::run(delayed.select(interrupt_handler).then(|_| Ok(())));
}
推荐阅读
- python - 在 Schema 定义后注册一个 post_load 钩子?
- flutter - Flutter 文本在列表中没有被省略
- c# - IdentityServer4 500 内部错误,使用授权端点时缺少 EpochTimeExtensions
- python - 使用python从selenium chrome webdriver启动时如何在隐身模式下添加扩展
- vb.net - vb.net Postgresql 数据库,使用 DataTable 并在 Datagridview 中显示位图
- android - 图像未在慢速网络上显示
- sql - 时差之和 - Oracle SQL
- angular - 获取 HttpHeader 在不同角度组件中解析的对象
- python - 合并后无法重命名python pandas数据框中的列(内连接)
- python - 创建一个模型来分类一个句子是否合乎逻辑