首页 > 解决方案 > 如何更改已与另一个流组合的 Tokio 计时器流的持续时间?

问题描述

我有三个流:

  1. 计时器流(定义如下)
  2. 广播频道流(定义如下)
  3. 一个 WebSocket 流(此处未定义,但我认为没有必要。它基于 tokio-tungstenite)

我将这 3 个流与一个select.

我想要命令我发送到广播频道来改变计时器 - 一旦流融合在一起,我希望能够重新调整 tokio 计时器。

这是我的计时器流:

fn timer_stream(dur: u64) -> impl futures::stream::Stream<Item = Input> {
    let task_time = tokio::time::Instant::now();

    tokio::time::interval_at(task_time, Duration::from_secs(dur))
        .map(move |_| Input::Command_first(task_time))
}

我希望能够更改task_timedur

这就是我将流组合在一起的方式:

let mut websocket_timer = select(exchange_websocket_stream, timer_stream(3));
let mut combined = select(websocket_timer, command_receiver);

command_receiver使用tokio::sync::broadcast;如下定义:

let (tx_tcp_commands, mut rx_tcp_commands) = broadcast::channel::<Input>(16);
command_receiver = tx_tcp_commands.subscribe()

Input是:

enum Input {
    Command_second(tokio::time::Instant),
    Command_first(tokio::time::Instant),
    Thing,
}

然后我将上述内容放入tokio::main执行程序并执行

loop {
    match combined.next().await {
        None => break,
        Some(Input::Command(t)) => etc,
    }
}

这作为一般流工作,我可以匹配发送到的命令command_receiver,我只需要知道如何更改计时器参数。

该循环正在处理大量的 WebSocket 消息,因此它必须是高效的。

我怎样才能做到这一点?

标签: rustrust-tokio

解决方案


推荐阅读