rust - 如何仅使用标准 Rust 库在循环间隔上运行一组函数而不同时运行相同的函数?
问题描述
我想使用 Rust 创建一个简单的调度程序,以便在定义的时间运行多个并发函数,但如果它们还没有完成就不要开始更多。
例如,如果定义的时间间隔是一秒,那么调度程序应该运行这些函数,并且如果之前的函数没有返回,则不要启动更多。目标是防止多次运行相同的功能。
我用 Go创建了一个工作示例,如下所示:
package main
import (
"fmt"
"sync"
"time"
)
func myFunc(wg *sync.WaitGroup) {
fmt.Printf("now: %+s\n", time.Now())
time.Sleep(3 * time.Second)
wg.Done()
}
func main() {
quit := make(chan bool)
t := time.NewTicker(time.Second)
go func() {
for {
select {
case <-t.C:
var wg sync.WaitGroup
for i := 0; i <= 4; i++ {
wg.Add(1)
go myFunc(&wg)
}
wg.Wait()
fmt.Printf("--- done ---\n\n")
case <-quit:
return
}
}
}()
<-time.After(time.Minute)
close(quit)
}
NewTicker
因为我在 Rust 标准库中没有找到类似 Go 的东西,所以我使用了Tokio并想出了这个
extern crate futures;
extern crate tokio;
use futures::future::lazy;
use std::{thread, time};
use tokio::prelude::*;
use tokio::timer::Interval;
fn main() {
let task = Interval::new(time::Instant::now(), time::Duration::new(1, 0))
.for_each(|interval| {
println!("Interval: {:?}", interval);
for i in 0..5 {
tokio::spawn(lazy(move || {
println!("I am i: {}", i);
thread::sleep(time::Duration::from_secs(3));
Ok(())
}));
}
Ok(())
})
.map_err(|e| panic!("interval errored; err={:?}", e));
tokio::run(task);
}
这种方法的问题是任务不会等待调用以前的函数,因此无论以前是否运行这些函数都会重新启动,我在这里缺少类似 Go 的sync.WaitGroup
. 什么可以用来实现与工作示例中相同的结果?
仅使用标准库是否可以实现这一点?这主要是出于学习目的,可能有一种非常简单的方法可以避免额外的复杂性。
最后,我想通过 HTTP 定期监视一些站点(仅获取返回的状态代码),但在获得所有响应之前不要再次查询所有站点。
解决方案
既然你想要并发并且只会使用标准库,那么你基本上必须使用线程。
在这里,我们为调度程序循环的每次迭代为每个函数启动一个线程,允许它们并行运行。然后我们等待所有函数完成,防止同时运行相同的函数两次。
use std::{
thread,
time::{Duration, Instant},
};
fn main() {
let scheduler = thread::spawn(|| {
let wait_time = Duration::from_millis(500);
// Make this an infinite loop
// Or some control path to exit the loop
for _ in 0..5 {
let start = Instant::now();
eprintln!("Scheduler starting at {:?}", start);
let thread_a = thread::spawn(a);
let thread_b = thread::spawn(b);
thread_a.join().expect("Thread A panicked");
thread_b.join().expect("Thread B panicked");
let runtime = start.elapsed();
if let Some(remaining) = wait_time.checked_sub(runtime) {
eprintln!(
"schedule slice has time left over; sleeping for {:?}",
remaining
);
thread::sleep(remaining);
}
}
});
scheduler.join().expect("Scheduler panicked");
}
fn a() {
eprintln!("a");
thread::sleep(Duration::from_millis(100))
}
fn b() {
eprintln!("b");
thread::sleep(Duration::from_millis(200))
}
您还可以使用 aBarrier
在线程中启动每个函数一次,然后在执行结束时同步所有函数:
use std::{
sync::{Arc, Barrier},
thread,
time::Duration,
};
fn main() {
let scheduler = thread::spawn(|| {
let barrier = Arc::new(Barrier::new(2));
fn with_barrier(barrier: Arc<Barrier>, f: impl Fn()) -> impl Fn() {
move || {
// Make this an infinite loop
// Or some control path to exit the loop
for _ in 0..5 {
f();
barrier.wait();
}
}
}
let thread_a = thread::spawn(with_barrier(barrier.clone(), a));
let thread_b = thread::spawn(with_barrier(barrier.clone(), b));
thread_a.join().expect("Thread A panicked");
thread_b.join().expect("Thread B panicked");
});
scheduler.join().expect("Scheduler panicked");
}
fn a() {
eprintln!("a");
thread::sleep(Duration::from_millis(100))
}
fn b() {
eprintln!("b");
thread::sleep(Duration::from_millis(200))
}
我个人不会使用这些解决方案中的任何一个。我会找到一个其他人编写并测试了我需要的代码的板条箱。
也可以看看:
推荐阅读
- odoo-14 - 编译模板“web.ViewSwitcherButton”odoo 时生成的代码无效
- powershell - Bitlocker 和 Powershell
- reactjs - 如何在 react-azure-mp 中集成“onprogress”功能?
- kubernetes - 具有不同命名空间的 HPA
- cas - 应用程序未授权使用 CAS
- r - 在 R 中,如何找到矩阵中第一次出现和最后一次出现的值的行号?
- r - 如何直接在 purrr::accumulate2() 中编写自定义函数
- apple-m1 - 在 Apple M1 上安装 MPFR 3.X
- python - 如何比较数组和取反值?
- python - 部分重新索引行索引 Pandas DataFrame