首页 > 解决方案 > Cycle detected in recursive async function

问题描述

I'm trying to find files with an extension in a dir recursively, and here's my current implementation:

use futures::future::BoxFuture;
use futures::Stream;
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            if let Some(child_path) = child.path().to_str() {
                if child.metadata().await?.is_dir() {
                    tokio::spawn(async {
                        one_level(child_path.to_string(), tx.clone(), ext.clone()).await;
                    });
                } else {
                    if child_path.ends_with(&ext.clone()) {
                        files.push(child_path.to_string())
                    }
                }
            } else {
                tx.send(Err(std::io::Error::new(
                    ErrorKind::Other,
                    "Invalid path".to_string(),
                )));
            }
        }

        for file in files {
            tx.send(Ok(file));
        }
        Ok(())
    }

    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        one_level(root_path, tx, ext).await;
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}

I don't quite understand why the compiler complains:

14 |     async fn one_level(path: String, tx: Sender<Result<String>>, ext: String) -> Result<()> {
   |                                                                                  ^^^^^^^^^^
   |
note: ...which requires borrow-checking `list_all::{closure#0}::one_level`...
  --> src/main.rs:14:5
.....
.....
   = note: ...which requires evaluating trait selection obligation `for<'r, 's, 't0> {std::future::ResumeTy, &'r str, std::string::String, &'s tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, &'t0 std::string::String, impl futures::Future, ()}: std::marker::Send`...
   = note: ...which again requires computing type of `list_all::{closure#0}::one_level::{opaque#0}`, completing the cycle
   = note: cycle used when evaluating trait selection obligation `{std::future::ResumeTy, std::string::String, tokio::sync::mpsc::Sender<std::result::Result<std::string::String, std::io::Error>>, impl futures::Future, ()}: std::marker::Send`

Is it possible to define such a recursive function as async? Can I parallel the procedure of listing dir through tokio::spwan and get speed up?

标签: recursionrustasync-await

解决方案


rust 异步函数被编译为状态机,因此有一个异步函数调用自己将需要状态机将自己嵌入到自己的定义中,这将是一个无限递归。

这是更好的解释hereBox已链接文档中解释的工作方法是通过(BoxFuture类型)和非异步函数引入间接:

use futures::future::BoxFuture;
use futures::{FutureExt, Stream};
use std::io::ErrorKind;
use std::pin::Pin;
use std::result;
use tokio::fs::read_dir;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio_stream::wrappers::ReceiverStream;

type Result<T> = result::Result<T, std::io::Error>;
type FileNameStream = Pin<Box<dyn Stream<Item = Result<String>> + Send + Sync + 'static>>;

async fn list_all(root_path: String, ext: String) -> Result<FileNameStream> {
    let (tx, rx): (Sender<Result<String>>, Receiver<Result<String>>) = channel(2);
    tokio::spawn(async {
        recursive(root_path, tx, ext).await.unwrap();
    });
    Ok(Box::pin(ReceiverStream::new(rx)))
}

fn recursive(
    path: String,
    tx: Sender<Result<String>>,
    ext: String,
) -> BoxFuture<'static, Result<()>> {
    async move {
        let mut dir = read_dir(path).await?;
        let mut files: Vec<String> = Vec::new();

        while let Some(child) = dir.next_entry().await? {
            match child.path().to_str() {
                Some(child_path) => {
                    let metadata = child.metadata().await?;

                    if metadata.is_dir() {
                        let cp = child_path.to_owned();
                        let tx = tx.clone();
                        let ext = ext.clone();

                        tokio::spawn(async {
                            recursive(cp, tx, ext).await.unwrap();
                        });
                    } else {
                        if child_path.ends_with(&ext) {
                            files.push(child_path.to_owned())
                        }
                    }
                }
                None => {
                    tx.send(Err(std::io::Error::new(
                        ErrorKind::Other,
                        "Invalid path".to_string(),
                    )))
                    .await
                    .unwrap();
                }
            }
        }

        for file in files {
            tx.send(Ok(file)).await.unwrap();
        }
        Ok(())
    }
    .boxed()
}

PS:我还修复了您的代码的其他一些问题,例如缺少await调用tx.send()-记住-期货仅在轮询时才执行工作!!!


推荐阅读