The following in translated from the Divide and Conquer example in the ZeroMQ guide.
module ZeroMQ
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
printfn "%A, %A" uri_source uri_sink
let rnd = Random()
use source = new PushSocket(uri_source)
use sink = new PushSocket(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
let ventilator_init () =
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
let ventilator_run () =
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket(uri_source)
use sink = new PushSocket(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
//printf "%s.\n" msg
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
ventilator_init()
for i=1 to 4 do Task.Run (worker i) |> ignore
let t = Task.Run sink
ventilator_run()
t.Wait()
[<EntryPoint>]
let main argv =
parallel_task()
0
What happens here is that a single worker gets all the messages and none of the other threads get assigned any work. Why is this happening?
open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets
let parallel_task () =
let task_number = 100
let uri_source, uri_sink =
let uri = "ipc://parallel_task"
Path.Join(uri,"source"), Path.Join(uri,"sink")
let ventilator () =
let rnd = Random()
use source = new PushSocket()
source.Bind(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
printf "Press enter when workers are ready.\n"
printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
Console.ReadLine() |> ignore
sink.SendFrame("0")
printf "Sending tasks to workers.\n"
Array.iter (string >> source.SendFrame) tasks
Thread.Sleep(1)
let worker i () =
printf "Starting worker %i\n" i
use source = new PullSocket()
source.Connect(uri_source)
use sink = new PushSocket()
sink.Connect(uri_sink)
while true do
let msg = source.ReceiveFrameString()
printf "Worker %i received message.\n" i
Thread.Sleep(int msg)
sink.SendFrame("")
let sink () =
use sink = new PullSocket()
sink.Bind(uri_sink)
let watch = Diagnostics.Stopwatch()
for i=1 to task_number do
let _ = sink.ReceiveFrameString()
if watch.IsRunning = false then watch.Start()
printf (if i % 10 = 0 then ":" else ".")
printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
Task.Run ventilator |> ignore
for i=1 to 4 do Task.Run (worker i) |> ignore
Task.Run(sink).Wait()
Here is the cleaned up version of the above that works properly. I had to explicitly note what is a bind and what is a connection. Thank you @somdoron for the hint.