首页 > 解决方案 > 如何仅使用标准 Rust 库在循环间隔上运行一组函数而不同时运行相同的函数?

问题描述

我想使用 Rust 创建一个简单的调度程序,以便在定义的时间运行多个并发函数,但如果它们还没有完成就不要开始更多。

例如,如果定义的时间间隔是一秒,那么调度程序应该运行这些函数,并且如果之前的函数没有返回,则不要启动更多。目标是防止多次运行相同的功能。

我用 Go创建了一个工作示例,如下所示:

package main

import (
    "fmt"
    "sync"
    "time"
)

func myFunc(wg *sync.WaitGroup) {
    fmt.Printf("now: %+s\n", time.Now())
    time.Sleep(3 * time.Second)
    wg.Done()
}

func main() {
    quit := make(chan bool)

    t := time.NewTicker(time.Second)
    go func() {
        for {
            select {
            case <-t.C:
                var wg sync.WaitGroup
                for i := 0; i <= 4; i++ {
                    wg.Add(1)
                    go myFunc(&wg)
                }
                wg.Wait()
                fmt.Printf("--- done ---\n\n")
            case <-quit:
                return
            }
        }
    }()

    <-time.After(time.Minute)
    close(quit)
}

NewTicker因为我在 Rust 标准库中没有找到类似 Go 的东西,所以我使用了Tokio想出了这个

extern crate futures;
extern crate tokio;

use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;

fn main() {
    let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
        .for_each(|interval| {
            println!("Interval: {:?}", interval);
            for i in 0..5 {
                tokio::spawn(lazy(move || {
                    println!("I am i: {}", i);
                    thread::sleep(time::Duration::from_secs(3));
                    Ok(())
                }));
            }
            Ok(())
        })
        .map_err(|e| panic!("interval errored; err={:?}", e));

    tokio::run(task);
}

这种方法的问题是任务不会等待调用以前的函数,因此无论以前是否运行这些函数都会重新启动,我在这里缺少类似 Go 的sync.WaitGroup. 什么可以用来实现与工作示例中相同的结果?

仅使用标准库是否可以实现这一点?这主要是出于学习目的,可能有一种非常简单的方法可以避免额外的复杂性。

最后,我想通过 HTTP 定期监视一些站点(仅获取返回的状态代码),但在获得所有响应之前不要再次查询所有站点。

标签: rust

解决方案


既然你想要并发并且只会使用标准库,那么你基本上必须使用线程。

在这里,我们为调度程序循环的每次迭代为每个函数启动一个线程,允许它们并行运行。然后我们等待所有函数完成,防止同时运行相同的函数两次。

use std::{
    thread,
    time::{Duration, Instant},
};

fn main() {
    let scheduler = thread::spawn(|| {
        let wait_time = Duration::from_millis(500);

        // Make this an infinite loop
        // Or some control path to exit the loop
        for _ in 0..5 {
            let start = Instant::now();
            eprintln!("Scheduler starting at {:?}", start);

            let thread_a = thread::spawn(a);
            let thread_b = thread::spawn(b);

            thread_a.join().expect("Thread A panicked");
            thread_b.join().expect("Thread B panicked");

            let runtime = start.elapsed();

            if let Some(remaining) = wait_time.checked_sub(runtime) {
                eprintln!(
                    "schedule slice has time left over; sleeping for {:?}",
                    remaining
                );
                thread::sleep(remaining);
            }
        }
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

您还可以使用 aBarrier在线程中启动每个函数一次,然后在执行结束时同步所有函数:

use std::{
    sync::{Arc, Barrier},
    thread,
    time::Duration,
};

fn main() {
    let scheduler = thread::spawn(|| {
        let barrier = Arc::new(Barrier::new(2));

        fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
            move || {
                // Make this an infinite loop
                // Or some control path to exit the loop
                for _ in 0..5 {
                    f();
                    barrier.wait();
                }
            }
        }

        let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
        let thread_b = thread::spawn(with_barrier(barrier.clone(), b));

        thread_a.join().expect("Thread A panicked");
        thread_b.join().expect("Thread B panicked");
    });

    scheduler.join().expect("Scheduler panicked");
}

fn a() {
    eprintln!("a");
    thread::sleep(Duration::from_millis(100))
}
fn b() {
    eprintln!("b");
    thread::sleep(Duration::from_millis(200))
}

我个人不会使用这些解决方案中的任何一个。我会找到一个其他人编写并测试了我需要的代码的板条箱。

也可以看看:


推荐阅读