asynchronous - 如何创建一个流,其中项目基于流先前返回的项目?
问题描述
我有一个futures::Stream
基于参数生成的函数。我想多次调用此函数并将流合并在一起。使事情复杂化的事实是,我想将流返回的值作为参数反馈给原始函数。
具体来说,我有一个函数可以将数字流返回到零:
fn numbers_down_to_zero(v: i32) -> impl Stream<Item = i32> {
stream::iter((0..v).rev())
}
我想从 5 开始调用这个函数。还应该为返回的每个奇数调用该函数。总调用集numbers_down_to_zero
将是:
numbers_down_to_zero(5);
numbers_down_to_zero(3);
numbers_down_to_zero(1);
numbers_down_to_zero(1);
产生总流量
4
3
2
1
0
2
1
0
0
0
有什么技术可以做到这一点?
解决方案
你可以用unfold
. 您将有一个“状态”结构,它既保留“基本流”(在这种情况下倒数到零)和将产生新流的项目列表,并将其用作unfold
保持状态的参数,而展开。
这样编译器就不必对生命周期所有权进行推理,因为async
每次调用闭包时状态都可以移动到块中。
/// Base stream (counting down to zero).
fn f(n: i32) -> impl Stream<Item = i32> {
stream::iter((0..n).rev())
}
/// "Recursive" stream
fn g(n: i32) -> impl Stream<Item = i32> {
/// Helper struct to keep state while unfolding
struct StreamState<S> {
inner_stream: S,
item_queue: VecDeque<i32>,
}
// Build helper struct
let state = StreamState {
inner_stream: f(n),
item_queue: VecDeque::new(),
};
// Unfold with state
stream::unfold(state, |mut state| async move {
loop {
if let Some(item) = state.inner_stream.next().await {
// Iterate inner stream, and potentially push item to queue
if item % 2 == 1 {
state.item_queue.push_front(item);
}
break Some((item, state));
} else if let Some(item) = state.item_queue.pop_back() {
// If inner stream is exhausted, produce new stream from queue
// and repeat loop
state.inner_stream = f(item);
} else {
// If queue is empty, we are done
break None;
}
}
})
}
StreamExt::next
要求内部流实现Unpin
,因此它不能用于任意流。您总是可以使用Box::pin(stream)
,因为Pin<Box<T>>
isUnpin
并实现Stream
if T: Stream
。
推荐阅读
- amazon-ec2 - scp 仅在手动设置服务器指纹后才起作用
- macos - 如何使用鼠标从 tmux 复制到 OSX 剪贴板
- webpack - 带有 publicPath 的 Webpack 资产生成器不起作用?
- unix - 如何修改默认文本背景颜色?
- php - 在本地机器上没有使用 Laravel 8 写入日志文件
- sql-server - SQL Server Docker 容器能否在 Windows Server core 2022 上运行(“linux”不能在此平台上使用)
- macos - 恢复出厂设置mac终端
- r - R中的Web抓取以检测特定关键字
- glsl - 为什么方向矢量朝向相反的方向?
- azure-functions - Azure Static Web App + Azure Function + PuppeteerSharp 给出 UnauthorizedAccessException