rust - 有没有办法关闭`tokio::runtime::current_thread::Runtime`?
问题描述
我正在使用tokio::runtime::current_thread::Runtime
并且我希望能够运行未来并将反应器停止在同一个线程中。页面上的示例未显示如何停止运行时。有什么办法可以做到吗?
解决方案
当 future 完成时,运行时将自动关闭:
use std::time::Duration;
use tokio::time; // 0.2.21
#[tokio::main]
async fn main() {
time::delay_for(Duration::from_secs(2)).await;
eprintln!("future complete");
}
请参阅如何在稳定的 Rust 中同步返回在异步 Future 中计算的值?用于创建运行时的其他方式。
如果你需要取消一个future,你可以创建一些会导致futurepoll
成功的东西。我可能会使用频道和select
:
use futures::{channel::oneshot, future, FutureExt}; // 0.3.5
use std::time::Duration;
use tokio::{task, time}; // 0.2.21
#[tokio::main]
async fn main() {
let future = async {
time::delay_for(Duration::from_secs(3600)).await;
eprintln!("future complete");
};
let (cancel_tx, cancel_rx) = oneshot::channel();
let another_task = task::spawn(async {
eprintln!("Another task started");
time::delay_for(Duration::from_secs(2)).await;
eprintln!("Another task canceling the future");
cancel_tx.send(()).expect("Unable to cancel");
eprintln!("Another task exiting");
});
future::select(future.boxed(), cancel_rx).await;
another_task.await.expect("The other task panicked");
}
这是一个替代手动解决方案,它非常简单、蛮力,而且可能不是很有效:
use pin_project::pin_project; // 0.4.17
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{self, Context, Poll},
thread,
time::Duration,
};
use tokio::time; // 0.2.21
#[tokio::main]
async fn main() {
let future = async {
time::delay_for(Duration::from_secs(3600)).await;
eprintln!("future complete");
};
let (future, cancel) = Cancelable::new(future);
let another_thread = thread::spawn(|| {
eprintln!("Another thread started");
thread::sleep(Duration::from_secs(2));
eprintln!("Another thread canceling the future");
cancel();
eprintln!("Another thread exiting");
});
future.await;
another_thread.join().expect("The other thread panicked");
}
#[pin_project]
#[derive(Debug)]
struct Cancelable<F> {
#[pin]
inner: F,
info: Arc<Mutex<CancelInfo>>,
}
#[derive(Debug, Default)]
struct CancelInfo {
cancelled: bool,
task: Option<task::Waker>,
}
impl<F> Cancelable<F> {
fn new(inner: F) -> (Self, impl FnOnce()) {
let info = Arc::new(Mutex::new(CancelInfo::default()));
let cancel = {
let info = info.clone();
move || {
let mut info = info.lock().unwrap();
info.cancelled = true;
if let Some(waker) = info.task.take() {
waker.wake();
}
}
};
let me = Cancelable { inner, info };
(me, cancel)
}
}
impl<F> Future for Cancelable<F>
where
F: Future<Output = ()>,
{
type Output = ();
fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
let mut info = this.info.lock().unwrap();
if info.cancelled {
Poll::Ready(())
} else {
let r = this.inner.poll(ctx);
if r.is_pending() {
info.task = Some(ctx.waker().clone());
}
r
}
}
}
也可以看看:
推荐阅读
- javascript - javascript中嵌套函数中的范围变量
- php - 不确定为什么我的端点 URL 在发出 POST 请求时抛出 404 错误
- java - 使用 marvin gaussianBlur 模糊图像
- sql - 显示顺序扫描结果而不是索引
- animation - Xamarin 动画导致我的应用程序跳过许多帧
- android - MotionLayout 导致 RecyclerView 项目被截断
- voip - 来电视频通话通知
- javascript - 从 json 中查找 key 在另一个变量中的值
- swift - SwiftUI 背景修改器不适用于导航工具栏?
- tensorflow - TensorFlow.js 收集字符串张量