首页 > 解决方案 > 如何使用 Tokio 生成许多可取消的计时器?

问题描述

如何使用 Tokio 实现固定数量的定时器,这些定时器会在线程间定期重置和取消?当计时器到期时,将执行回调。

一个类似于 Go 的 APItime.AfterFunc基本上是我想要的:

package main

import (
    "fmt"
    "time"
)

func main() {
    t := time.AfterFunc(time.Hour, func() {
        // happens every 2 seconds with 1 second delay
        fmt.Println("fired")
    })

    for {
        t.Reset(time.Second)
        time.Sleep(time.Second * 2)
    }
}

我发现实现(足够)类似API的唯一板条箱是计时器,它以非常幼稚的方式通过产生2个线程来实现。当定时器经常被重置时,这很快就会变得令人望而却步。

显而易见的答案是使用 Tokio,问题是如何优雅地做到这一点。

一种选择是在每次更新计时器时生成一个新的绿色线程,并使用原子取消前一个计时器,方法是在这个原子上调节回调的执行,例如这个伪 Rust:

tokio::run({
    // for every timer spawn with a new "cancel" atomic
    tokio::spawn({
        Delay::new(Instant::now() + Duration::from_millis(1000))
            .map_err(|e| panic!("timer failed; err={:?}", e))
            .and_then(|_| {
                if !cancelled.load(Ordering::Acquire) {
                    println!("fired");
                }
                Ok(())
            })
    })
})

问题是我维护已经取消的计时器的状态,可能持续几分钟。此外,它看起来并不优雅。

此外tokio::time::Delaytokio::time::DelayQueue似乎也适用。特别是,通过Key从“插入”返回的引用定时器来重置和取消定时器的能力。

目前还不清楚如何在多线程应用程序中使用这个库,即:

返回值表示插入,并用于删除和重置的参数。请注意,Key 是令牌,一旦从队列中删除值,则可以重用,方法是在到达时调用 poll 或调用 remove。此时,调用者必须注意不要再次使用返回的 Key,因为它可能会引用队列中的不同项目。

这将在通过其键取消计时器的任务与从流中消耗计时器事件的任务之间创建竞争条件DelayQueue- 导致恐慌或取消不相关的计时器。

标签: timerrustrust-tokio

解决方案


您可以将Selectfutures-rs 中的组合器与 Tokio 一起使用。它返回第一个完成的未来的结果,然后忽略/停止轮询另一个。

作为第二个未来,我们可以使用一个接收器oneshot::channel来创建一个信号来完成我们的组合器未来。

use futures::sync::oneshot;
use futures::*;
use std::thread;
use std::time::{Duration, Instant};
use tokio::timer::Delay;

fn main() {
    let (interrupter, interrupt_handler) = oneshot::channel::<()>();

    //signal to cancel delayed call
    thread::spawn(move || {
        thread::sleep(Duration::from_millis(500)); //increase this value more than 1000ms to see is delayed call is working or not.
        interrupter
            .send(())
            .expect("Not able to cancel delayed future");
    });

    let delayed = Delay::new(Instant::now() + Duration::from_millis(1000))
        .map_err(|e| panic!("timer failed; err={:?}", e))
        .and_then(|_| {
            println!("Delayed Call Executed!");

            Ok(())
        });

    tokio::run(delayed.select(interrupt_handler).then(|_| Ok(())));
}

操场


推荐阅读