首页 > 解决方案 > 如何创建一个流,其中项目基于流先前返回的项目?

问题描述

我有一个futures::Stream基于参数生成的函数。我想多次调用此函数并将流合并在一起。使事情复杂化的事实是,我想将流返回的值作为参数反馈给原始函数。

具体来说,我有一个函数可以将数字流返回到零:

fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
    stream::iter((0..v).rev())
}

我想从 5 开始调用这个函数。还应该为返回的每个奇数调用该函数。总调用集numbers_down_to_zero将是:

numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);

产生总流量

4
3
2
1
0
2
1
0
0
0

有什么技术可以做到这一点?

标签: asynchronousruststreamfuture

解决方案


你可以用unfold. 您将有一个“状态”结构,它既保留“基本流”(在这种情况下倒数到零)和将产生新流的项目列表,并将其用作unfold保持状态的参数,而展开。

这样编译器就不必对生命周期所有权进行推理,因为async每次调用闭包时状态都可以移动到块中。

/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
    stream::iter((0..n).rev())
}

/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
    /// Helper struct to keep state while unfolding
    struct StreamState<S> {
        inner_stream: S,
        item_queue: VecDeque<i32>,
    }

    // Build helper struct
    let state = StreamState {
        inner_stream: f(n),
        item_queue: VecDeque::new(),
    };

    // Unfold with state
    stream::unfold(state, |mut state| async move {
        loop {
            if let Some(item) = state.inner_stream.next().await {
                // Iterate inner stream, and potentially push item to queue
                if item % 2 == 1 {
                    state.item_queue.push_front(item);
                }
                break Some((item, state));
            } else if let Some(item) = state.item_queue.pop_back() {
                // If inner stream is exhausted, produce new stream from queue
                // and repeat loop
                state.inner_stream = f(item);
            } else {
                // If queue is empty, we are done
                break None;
            }
        }
    })
}

完整的游乐场示例

StreamExt::next要求内部流实现Unpin,因此它不能用于任意流。您总是可以使用Box::pin(stream),因为Pin<Box<T>>isUnpin并实现Streamif T: Stream


推荐阅读