f# - 为什么任务没有分配给所有工人?
问题描述
以下内容来自 ZeroMQ 指南中的分而治之示例。
module ZeroMQ
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
printfn "%A, %A" uri_source uri_sink
let rnd = Random()
use source = new PushSocket(uri_source)
use sink = new PushSocket(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
let ventilator_init () =
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
let ventilator_run () =
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket(uri_source)
use sink = new PushSocket(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
//printf "%s.\n" msg
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
ventilator_init()
for i=1 to 4 do Task.Run (worker i) |> ignore
let t = Task.Run sink
ventilator_run()
t.Wait()
[<EntryPoint>]
let main argv =
parallel_task()
0
这里发生的是单个工作人员获取所有消息,而其他线程都没有分配任何工作。为什么会这样?
解决方案
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
let ventilator () =
let rnd = Random()
use source = new PushSocket()
source.Bind(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket()
source.Connect(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket()
sink.Bind(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
Task.Run ventilator |> ignore
for i=1 to 4 do Task.Run (worker i) |> ignore
Task.Run(sink).Wait()
这是上面正常工作的清理版本。我必须明确指出什么是绑定,什么是连接。谢谢@somdoron 的提示。
推荐阅读
- hyperledger-fabric - fabric 2.0 无法生成创世块,没有定义策略
- python - 我正在尝试使用模块 simplegui 在 CodeSkulptor 中制作一个带有两个输入字段的计算器。我不知道我做错了什么
- python - 使用字典映射 Pandas 列的最佳方法
- swift - 在自定义类 swift 中支持 for-each
- html - 使用 Ruby on Rails 从搜索结果中显示列表/或表格
- codenameone - 通过代号一的 ActionBar 文本颜色
- vb.net - 如何将用 ~ 分隔的文本文件解析为 VB.Net 中的数据表或数组
- google-analytics - BigQuery Google Analytics:访问一组页面或具有特定自定义维度值的会话数
- android - Kotlin 如何正确使用 Android Intent 保存文件
- java - 无法在可执行 jar 中加载资源