Search code examples
f#netmq

Why are the tasks not being distributed to all the workers?


The following in translated from the Divide and Conquer example in the ZeroMQ guide.

module ZeroMQ

open System
open System.IO
open System.Threading
open System.Threading.Tasks
open NetMQ
open NetMQ.Sockets

let parallel_task () =
    let task_number = 100
    let uri_source, uri_sink = 
        let uri = "ipc://parallel_task"
        Path.Join(uri,"source"), Path.Join(uri,"sink")

    printfn "%A, %A" uri_source uri_sink

    let rnd = Random()
    use source = new PushSocket(uri_source)
    use sink = new PushSocket(uri_sink)
    let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)

    let ventilator_init () =
        printf "Press enter when workers are ready.\n"
        printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
        Console.ReadLine() |> ignore

    let ventilator_run () =
        sink.SendFrame("0")
        printf "Sending tasks to workers.\n"
        Array.iter (string >> source.SendFrame) tasks
        Thread.Sleep(1)

    let worker i () =
        printf "Starting worker %i\n" i
        use source = new PullSocket(uri_source)
        use sink = new PushSocket(uri_sink)
        while true do
            let msg = source.ReceiveFrameString()
            printf "Worker %i received message.\n" i
            //printf "%s.\n" msg
            Thread.Sleep(int msg)
            sink.SendFrame("")

    let sink () =
        use sink = new PullSocket(uri_sink)
        let watch = Diagnostics.Stopwatch()
        for i=1 to task_number do
            let _ = sink.ReceiveFrameString()
            if watch.IsRunning = false then watch.Start()
            printf (if i % 10 = 0 then ":" else ".")
        printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
    ventilator_init()
    for i=1 to 4 do Task.Run (worker i) |> ignore
    let t = Task.Run sink
    ventilator_run()
    t.Wait()

[<EntryPoint>]
let main argv =
    parallel_task()
    0

What happens here is that a single worker gets all the messages and none of the other threads get assigned any work. Why is this happening?


Solution

  • open System
    open System.IO
    open System.Threading
    open System.Threading.Tasks
    open NetMQ
    open NetMQ.Sockets
    
    let parallel_task () =
        let task_number = 100
        let uri_source, uri_sink = 
            let uri = "ipc://parallel_task"
            Path.Join(uri,"source"), Path.Join(uri,"sink")
    
        let ventilator () =
            let rnd = Random()
            use source = new PushSocket()
            source.Bind(uri_source)
            use sink = new PushSocket()
            sink.Connect(uri_sink)
            let tasks = Array.init task_number (fun _ -> rnd.Next 100+1)
            printf "Press enter when workers are ready.\n"
            printf "Total expected time: %A\n" (TimeSpan.FromMilliseconds(Array.sum tasks |> float))
            Console.ReadLine() |> ignore
            sink.SendFrame("0")
            printf "Sending tasks to workers.\n"
            Array.iter (string >> source.SendFrame) tasks
            Thread.Sleep(1)
    
        let worker i () =
            printf "Starting worker %i\n" i
            use source = new PullSocket()
            source.Connect(uri_source)
            use sink = new PushSocket()
            sink.Connect(uri_sink)
            while true do
                let msg = source.ReceiveFrameString()
                printf "Worker %i received message.\n" i
                Thread.Sleep(int msg)
                sink.SendFrame("")
    
        let sink () =
            use sink = new PullSocket()
            sink.Bind(uri_sink)
            let watch = Diagnostics.Stopwatch()
            for i=1 to task_number do
                let _ = sink.ReceiveFrameString()
                if watch.IsRunning = false then watch.Start()
                printf (if i % 10 = 0 then ":" else ".")
            printf "\nTotal elapsed time: %A msec\n" watch.Elapsed
    
        Task.Run ventilator |> ignore
        for i=1 to 4 do Task.Run (worker i) |> ignore
        Task.Run(sink).Wait()
    

    Here is the cleaned up version of the above that works properly. I had to explicitly note what is a bind and what is a connection. Thank you @somdoron for the hint.