首页 > 解决方案 > 如果程序立即失败,MailboxProcessor 第一个循环将无法运行

问题描述

我有一个命令定期运行 SFTP 检查并将结果记录到文件中。

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    sw.Close()
    sw.Dispose()
0  

它循环一个MailboxProcessor

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), msg)
        printfn "%s" msg
        // loop to top
        return! messageLoop()  
        }
    // start the loop 
    messageLoop() 
    )

调用它来将消息写入日志

let sftpExample local host port username (password:string) =
    async {
        use client = new SftpClient(host, port, username, password)
        client.Connect()
        sprintf "Connected to %s\nroot dir list" host  |> printerAgent.Post
        do! downloadDir local client ""   
        sprintf "Done, disconnecting now" |> printerAgent.Post
        client.Disconnect()
    } |> Async.RunSynchronously

文件下载是异步的,以及相应的消息,但一切似乎都运行良好。

问题是 - 如果由于某些原因 sftp 连接立即失败,MailboxProcessor则没有时间记录异常消息。

我试图做的——这确实有效——是printfn "%s" ex.Message在最后添加一个:我只是想知道是否有人设想了一个更好的解决方案。

仅供参考,完整的代码在这个 gist中。

标签: asynchronousf#mailboxprocessor

解决方案


实际上,您希望程序等到 MailboxProcessor 处理完所有的消息队列后,程序才退出。您printfn "%s" ex.Message似乎正在工作,但不能保证工作:如果 MailboxProcessor 在其队列中有多个项目,则运行该printfn函数的线程可能会在 MailboxProcessor 的线程有时间处理其所有消息之前完成。

我建议的设计是将您的输入更改为printerAgentDU,如下所示:

type printerAgentMsg =
    | Message of string
    | Shutdown

然后,当您希望打印机代理完成发送其消息时,请在函数中使用MailboxProcessor.PostAndReply(并注意文档中的使用示例)并向其发送消息。请记住,MailboxProcessor 消息已排队:当它接收到消息时,它已经通过了队列中的其余消息。所以它处理消息所需要做的就是返回一个回复,而不是再次调用它的循环。并且因为您使用了而不是,所以 main 函数将阻塞,直到 MailboxProcessor 完成所有工作。(为了避免任何可能永远阻塞的情况,我建议在您的通话中设置 10 秒的超时时间;默认超时时间为 -1,表示永远等待)。mainShutdownShutdownShutdownunitPostAndReplyPostAndReplyAsyncPostAndReply

编辑:这是我的意思的一个例子(未经测试,使用风险自负):

type printerAgentMsg =
    | Message of string
    | Shutdown of AsyncReplyChannel<unit>

let printerAgent = MailboxProcessor.Start(fun inbox-> 
    // the message processing function
    let rec messageLoop() = async{        
        // read a message
        let! msg = inbox.Receive()
        // process a message
        match msg with
        | Message text ->
            sw.WriteLine("{0}: {1}", DateTime.UtcNow.ToShortTimeString(), text)
            printfn "%s" text
            // loop to top
            return! messageLoop()
        | Shutdown replyChannel ->
            replyChannel.Reply()
            // We do NOT do return! messageLoop() here
        }
    // start the loop 
    messageLoop() 
    )

let logPath = Path.Combine(config.["SharedFolder"],timestamp)
let sw = new StreamWriter(logPath,true)
//...
[<EntryPoint>]
let main argv = 
    try 
        sftpExample config.["SharedFolder"] config.["SFTPFolder"] 22 "usr" "pswd" |> ignore
    with 
    | ex -> 
        ex.Message |> Message |> printerAgent.Post
        printfn "%s" ex.Message // <- NOTICE THIS LINE
    printerAgent.PostAndReply( (fun replyChannel -> Shutdown replyChannel), 10000)  // Timeout = 10000 ms = 10 seconds
    sw.Close()
    sw.Dispose()

推荐阅读