recursion - Cycle detected in recursive async function
问题描述
I'm trying to find files with an extension in a dir recursively, and here's my current implementation:
use futures::future::BoxFuture;
use futures::Stream;
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;
async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
let mut dir = read_dir(path).await?;
let mut files: Vec<String> = Vec::new();
while let Some(child) = dir.next_entry().await? {
if let Some(child_path) = child.path().to_str() {
if child.metadata().await?.is_dir() {
tokio::spawn(async {
one_level(child_path.to_string(), tx.clone(), ext.clone()).await;
});
} else {
if child_path.ends_with(&ext.clone()) {
files.push(child_path.to_string())
}
}
} else {
tx.send(Err(std::io::Error::new(
ErrorKind::Other,
"Invalid path".to_string(),
)));
}
}
for file in files {
tx.send(Ok(file));
}
Ok(())
}
let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
tokio::spawn(async {
one_level(root_path, tx, ext).await;
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
I don't quite understand why the compiler complains:
14 | async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
| ^^^^^^^^^^
|
note: ...which requires borrow-checking `list_all::{closure#0}::one_level`...
--> src/main.rs:14:5
.....
.....
= note: ...which requires evaluating trait selection obligation `for<'r, 's, 't0> {std::future::ResumeTy, &'r str, std::string::String, &'s tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, &'t0 std::string::String, impl futures::Future, ()}: std::marker::Send`...
= note: ...which again requires computing type of `list_all::{closure#0}::one_level::{opaque#0}`, completing the cycle
= note: cycle used when evaluating trait selection obligation `{std::future::ResumeTy, std::string::String, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, impl futures::Future, ()}: std::marker::Send`
Is it possible to define such a recursive function as async? Can I parallel the procedure of listing dir through tokio::spwan
and get speed up?
解决方案
rust 异步函数被编译为状态机,因此有一个异步函数调用自己将需要状态机将自己嵌入到自己的定义中,这将是一个无限递归。
这是更好的解释here。Box
已链接文档中解释的工作方法是通过(BoxFuture
类型)和非异步函数引入间接:
use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;
type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;
async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
tokio::spawn(async {
recursive(root_path, tx, ext).await.unwrap();
});
Ok(Box::pin(ReceiverStream::new(rx)))
}
fn recursive(
path: String,
tx: Sender<Result<String>>,
ext: String,
) -> BoxFuture<'static, Result<()>> {
async move {
let mut dir = read_dir(path).await?;
let mut files: Vec<String> = Vec::new();
while let Some(child) = dir.next_entry().await? {
match child.path().to_str() {
Some(child_path) => {
let metadata = child.metadata().await?;
if metadata.is_dir() {
let cp = child_path.to_owned();
let tx = tx.clone();
let ext = ext.clone();
tokio::spawn(async {
recursive(cp, tx, ext).await.unwrap();
});
} else {
if child_path.ends_with(&ext) {
files.push(child_path.to_owned())
}
}
}
None => {
tx.send(Err(std::io::Error::new(
ErrorKind::Other,
"Invalid path".to_string(),
)))
.await
.unwrap();
}
}
}
for file in files {
tx.send(Ok(file)).await.unwrap();
}
Ok(())
}
.boxed()
}
PS:我还修复了您的代码的其他一些问题,例如缺少await
调用tx.send()
-记住-期货仅在轮询时才执行工作!!!
推荐阅读
- azure - 如何创建用户用户定义函数在 kusto 中进行字符串连接
- for-loop - 在 Jekyll for 循环中访问嵌套数据
- django - 在上下文中使用大型生成器流式传输 Django 模板
- google-cloud-platform - 在没有任何组织级别权限的情况下获取 GCP 中给定项目的资产/资源列表
- azure-devops - 用于成功测试的 Azure Devops Pipelines 测试附件
- android - 如何在 Unity 混合应用程序中从 VR 模式转到 2D 模式?
- python-3.x - 如何使用 Python 从运行 OS 的应用程序中复制粘贴数据?
- javascript - 我怎样才能用javascript做日计时器?
- javascript - WordPress > HTML/PHP/JS > 更改特定页面上的页脚文本
- .htaccess - 地图服务器包装 cgi-bin/mapserv 和 map=mapfile.map 的问题