首页 > 解决方案 > F# MailboxProcessor 限制并行度

问题描述

我是 F# 的新手,正在尝试使用 MailboxProcessor 来确保状态更改是单独完成的。

简而言之,我将操作(描述状态更改的不可变对象)发布到 MailboxProcessor,在递归函数中我读取消息并生成新状态(即在下面的示例中将项目添加到集合中)并将该状态发送到下一次递归。

open System

type AppliationState =
    {
        Store : string list
    }
    static member Default = 
        {
            Store = List.empty
        }
    member this.HandleAction (action:obj) =
        match action with
        | :? string as a -> { this with Store = a :: this.Store }
        | _ -> this

type Agent<'T> = MailboxProcessor<'T>     

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor = Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                Console.WriteLine("{s: " + s.Store.Length.ToString() + " s': " + s'.Store.Length.ToString())
                return! loop s'
                }
        loop AppliationState.Default)

    static member HandleAction (action:obj) =
        AppHolder.Processor.Post action

[<EntryPoint>]
let main argv =
    AppHolder.HandleAction "a"
    AppHolder.HandleAction "b"
    AppHolder.HandleAction "c"
    AppHolder.HandleAction "d"

    Console.ReadLine()
    0 // return an integer exit code

预期输出为:

s: 0 s': 1
s: 1 s': 2
s: 2 s': 3
s: 3 s': 4  

我得到的是:

s: 0 s': 1
s: 0 s': 1
s: 0 s': 1
s: 0 s': 1

阅读 MailboxProcessor 的文档并对其进行谷歌搜索,我的结论是它是一个消息队列,由“单线程”处理,而不是看起来它们都是并行处理的。

我在这里完全不在场吗?

标签: multithreadingf#agentmailboxprocessor

解决方案


问题是您认为AppHolder.Processor每次都是同一个对象,但实际上每次都是不同的 MailboxProcessor。我将您的 AppHolder 代码更改为以下内容:

[<AbstractClass; Sealed>]
type AppHolder private () =
    static member private Processor =
        printfn "Starting..."
        Agent.Start(fun inbox ->
        let rec loop (s : AppliationState) =
            async {
                let! action = inbox.Receive()
                let s' = s.HandleAction action
                printfn "{s: %A s': %A}" s s'
                return! loop s'
                }
        loop AppliationState.Default)

    static member HandleAction (action:obj) =
        AppHolder.Processor.Post action

我所做的唯一更改是简化使用 Console.WriteLine 调用printfn并获得更多调试细节,并添加将在 MailboxProcessor 构建和启动之前立即执行%A的单个调用。printfn "Starting..."我得到的输出是:

Starting...
Starting...
Starting...
Starting...
{s: {Store = [];} s': {Store = ["b"];}}
{s: {Store = [];} s': {Store = ["d"];}}
{s: {Store = [];} s': {Store = ["c"];}}
{s: {Store = [];} s': {Store = ["a"];}}

请注意,该printfn "Starting..."行已经执行了四次。

这吸引了很多 F# 新手:member关键字定义了一个属性,而不是一个字段。每次评估属性时,都会重新评估该属性的主体。所以每次访问AppHolder.Processor时,都会得到一个新的 MailboxProcessor。有关更多详细信息,请参阅https://docs.microsoft.com/en-us/dotnet/fsharp/language-reference/members/properties

您可能想要的是以下内容:

[<AbstractClass; Sealed>]
type AppHolder private () =
    static let processor =
        printfn "Starting..."
        Agent.Start(fun inbox ->
            // ...
        )

    static member HandleAction (action:obj) =
        processor.Post action

推荐阅读