rust - 带有多队列的 tokio 有时会挂起,有时会工作
问题描述
我正在尝试用 tokio 对 crate多队列Stream
进行基准测试,以通过制作可以迭代的 s来实现发布者/订阅者的某些东西。我不相信效率(我可能需要数十或数百个侦听器来过滤项目,并且单个发布者将每毫秒发布大约 10 条消息),所以我想在我承诺之前对方法进行基准测试它。但是,现在,我遇到了一个奇怪的错误,有时tokio::timer::Interval
似乎根本没有触发。
完整代码如下:
#![feature(test)]
extern crate futures;
extern crate multiqueue;
extern crate test;
extern crate tokio;
#[cfg(test)]
mod tests {
use super::*;
use futures::future::lazy;
use futures::sync::mpsc::{channel, Receiver, Sender};
use futures::{Async, Poll, Stream};
use futures::{Future, Sink};
use test::Bencher;
use tokio::timer::Interval;
#[bench]
fn bench_many(b: &mut Bencher) {
tokio::run(lazy(|| {
let (tx, rx) = multiqueue::mpmc_fut_queue(1000);
tokio::spawn(
Interval::new_interval(std::time::Duration::from_micros(100))
.take(100)
.map(|_| 100)
.map_err(|e| {
eprintln!("Got interval error = {:?}", e);
})
.fold(tx, |tx, num| {
println!("Sending {}", num);
tx.send(num).map_err(|e| println!("send err = {:?}", e))
})
.map(|_| ()),
);
for i in 0..3 {
println!("Starting");
let rx = rx.clone();
tokio::spawn(rx.for_each(move |num| {
println!("{} Got a num! {}", i, num);
Ok(())
}));
}
Ok(())
}));
}
}
我正在运行它cargo bench
。futures
在版本上"0.1"
,tokio
在版本上"0.1"
,multiqueue
在版本上"0.3"
。
有时,整个测试以许多“[0-2] Got a num! 100”和“Sending 100”的消息完成,但有时它会挂在中间(在几个“Sending”和“Got a”消息之后)或仅挂起 3 条“开始”消息。
我怀疑这可能是我可以同时运行的任务数量的问题tokio
,但我真的不明白为什么这会是我遇到的限制,因为我正在生成两种类型的任务时间经常给执行人。
我怎样才能使它更可靠?
解决方案
推荐阅读
- javascript - Can't pass var (which is a function) as argument in another function
- r - 总结后变异 - dplyr
- c# - 强制主线程在 RunWorkerCompletedEventArgs 上使用 Return
- javascript - 发生鼠标悬停事件时如何更改绘图图上的光标?
- manifest - 显示:“独立”PWA 不删除地址栏
- encoding - GitKraken 文件编码
- teamcity - Teamcity 不运行整个自定义脚本
- jquery - 背景图像出现在链接悬停
- ios - 总结 Swift 中 UITextField 中的每个数字
- jquery - 制表编辑器不会在响应模式下编辑折叠的字段