asynchronous - F# AsyncSeq - mke asyncSeq 如何按返回值的顺序迭代?
问题描述
我在使用 AsyncSeq 时遇到了一些问题,它并行运行一些任务,然后迭代那些并行任务的结果,一次只做一个。我认为这对于 AsyncSeq 来说应该是完美的,但是因为它以序列的初始顺序进行迭代,它不会在它们进入时执行任务。很难解释。看看这个小模型可能很容易:
//completes after the given time
let randomwait time=
async{
printfn "started waiting : %i"time
do! Async.Sleep(time*1000);
printfn "waited %i" time
return time
}
//Creates 10 tasks in decending order of time taken to complete: 10s,9s 8s, etc
let stream=
asyncSeq{
for i=10 downto 1 do
let waitTime= i
yield randomwait waitTime
}
let run =
let task=
stream
|> AsyncSeq.mapAsyncParallel id // This runs all our randomWait tasks at once
|>AsyncSeq. 1(fun time ->async{ printfn "printing for time : %i" time})
Async.RunSynchronously task
我希望代码在每次打印之间延迟一秒输出以下内容。
Printing for Time: 1
Printing for Time: 2
etc etc
但是,由于迭代顺序不是由前一个并行任务中的完成顺序设置的,因此结果是倒数的,并且在前 10 秒任务完成后立即打印。
Printing for Time: 10
Printing for Time: 9
etc etc
任何帮助将不胜感激。如有必要,我很乐意使用其他解决方案,任何允许并行然后一次迭代的解决方案。
解决方案
代码中重要的关键操作是mapAsyncParallel
. 这会遍历输入异步序列,启动所有任务,然后按启动顺序生成结果。
该操作不会等待所有任务完成,而是仅在产生所有 N-1 个早期任务的结果后才产生第 N 个任务的结果。
以下示例流比您的示例更好地演示了该行为:
let stream=
asyncSeq {
for waitTime = 5 downto 1 do
yield randomwait waitTime
for waitTime = 10 to 15 do
yield randomwait waitTime
}
如果您将此用作示例,您的代码将等待 5 秒,然后它会打印“printing for time” 5, 4, 3, 2, 1(因为它必须等待 5 秒直到第一个任务完成,同时, 剩下的 4 个已完成),但随后它将再等待 5 秒并打印“打印时间”6、等待 1 秒、打印 7、等待 1 秒、打印 8 等。
如果您替换mapAsyncParalle
为 just mapAsync
,则代码将按顺序运行任务并(按顺序)等待每个任务完成。然后您将不会同时发生任何事情,并且等待时间会更长。
做你想做的(我认为),最好的选择是从 using 切换AsyncSeq<T>
到 using Observable<T>
。异步序列是连续的并保留元素的顺序。Observable 不会这样做。使用FSharp.Control.Reactive库,您可以:
let task=
stream
|> AsyncSeq.toObservable
|> Observable.bind Observable.ofAsync
|> Observable.iter (fun time -> printfn "printing for time : %i" time)
Observable.wait task |> ignore
在这里,bind
操作需要一个 observable,对于每个产生的值,它都会启动一个新的 observable(在我们的例子中,它只产生一个结果),然后它会按照它们到达的顺序收集所有结果,所以你得到了结果1
首先,即使这是作为第五个元素开始的。
推荐阅读
- r - R中数据框中每行最大值的比率
- javascript - RxJs - 创建 Observable 的 Observable 数组的正确模式/方法是什么?
- r - nflscrapR 包的安装问题
- flutter - 如何在 Flutter 中共享卡片?
- c++ - C++ 中的 restrict、__restrict 和 _restrict_ 关键字有什么区别?
- flutter - 将文本编辑器转换为 html,然后打印为 pdf
- reactjs - 如何在没有 php artisan 预设的情况下使用 laravel api 部署 React 应用程序
- c++ - 尝试在结构中初始化联合时出错
- angular - 从 Angular 的单个组件中隐藏 app.component 文件的内容
- swift - 从主视图导航到详细视图时如何隐藏自定义选项卡视图?| SwiftUI 代码