Search code examples
gochannelgoroutine

How to create channels in loop?


I am learning concurrency in go and how it works.

What I am trying to do ?

  • Loop through slice of data
  • Create struct for required/needed data
  • Create channel for that struct
  • Call worker func using go rutine and pass that channel to that rutine
  • Using data from channel do some processing
  • Set the processed output back into channel
  • Wait in main thread to get output from all the channels which we kicked off

Code Which I tried

        package main

    import (
        "fmt"
        "github.com/pkg/errors"
        "time"
    )

    type subject struct {
        Name string
        Class string
        StartDate time.Time
        EndDate time.Time
    }

    type workerData struct {
        Subject string
        Class string
        Result string
        Error error
    }

    func main () {

        // Creating test data
        var subjects []subject
        st,_ := time.Parse("01/02/2016","01/01/2015")
        et,_ := time.Parse("01/02/2016","01/01/2016")
        s1 := subject{Name:"Math", Class:"3", StartDate:st,EndDate:et }
        s2 := subject{Name:"Geo", Class:"3", StartDate:st,EndDate:et }
        s3 := subject{Name:"Bio", Class:"3", StartDate:st,EndDate:et }
        s4 := subject{Name:"Phy", Class:"3", StartDate:st,EndDate:et }
        s5 := subject{Name:"Art", Class:"3", StartDate:st,EndDate:et }
        subjects = append(subjects, s1)
        subjects = append(subjects, s2)
        subjects = append(subjects, s3)
        subjects = append(subjects, s4)
        subjects = append(subjects, s5)
        c := make(chan workerData) // I am sure this is not how I should be creating channel

        for i := 0 ; i< len(subjects) ; i++ {
            go worker(c)
        }

        for _, v := range subjects {
            // Setting required data in channel
            data := workerData{Subject:v.Name, Class:v.Class}

            // set the data and start the routine
            c <- data // I think this will update data for all the routines ? SO how should create separate channel for each routine

        }

        // I want to wait till all the routines set the data in channel and return the data from workers.
        for {
            select {
                case data := <- c :
                    fmt.Println(data)
            }
        }
    }

    func worker (c chan workerData) {
        data := <- c
        // This can be any processing
        time.Sleep(100 * time.Millisecond)
        if data.Subject != "Math" {
            data.Result = "Pass"
        } else {
            data.Error = errors.New("Subject not found")
        }
        fmt.Println(data.Subject)
        // returning processed data and error to channel
        c <- data
        // Rightfully this closes channel and here after I get error send on Closed channel.
        close(c)
    }

Playgorund Link - https://play.golang.org/p/hs1-B1UR98r

Issue I am Facing

I am not sure how to create different channel for each data item. The way I am currently doing will update the channel data for all routines. I want to know is there way to create diffrent channel for each data item in loop and pass that to the go rutine. And then wait in main rutine to get the result back from rutines from all channels.

Any pointers/ help would be great ? If any confusion feel free to comment.


Solution

  • "// I think this will update data for all the routines ?"

    A channel (to simplify) is not a data structure to store data.

    It is a structure to send and receive data over different goroutines.

    As such, notice that your worker function is doing send and receive on the same channel within each goroutine instances. If you were having only one instance of such worker, this would deadlock (https://golang.org/doc/articles/race_detector.html).

    In the version of the code you posted, for a beginner this might seem to work because you have many workers exchanging works to each other. But it is wrong for a correct program.

    As a consequence, if a worker can not read and write the same channel, then it must consume a specific writable channel to send its results to some other routines.

    // I want to wait till all the routines set the data in channel and return the data from workers.

    This is part of the synchronization mechanisms required to ensure that a pusher waits until all its workers has finished their job before proceeding further. (this blog post talks about it https://medium.com/golangspec/synchronized-goroutines-part-i-4fbcdd64a4ec)

    // Rightfully this closes channel and here after I get error send on Closed channel.

    Take care that you have n routines of workers executing in parallel. The first of this worker to reach the end of its function will close the channel, making it unwritable to other workers, and false signaling its end to main.

    Normally one use the close statement on the writer side to indicate that there is no more data into the channel. To indicate it has ended. This signal is consumed by readers to quit their read-wait operation of the channel.

    As an example, lets review this loop

        for {
            select {
                case data := <- c :
                    fmt.Println(data)
            }
        }
    

    it is bad, really bad.

    1. It is an infinite loop with no exit statement
    2. The select is superfluous and does not contain exit statement, remember that a read on a channel is a blocking operation.
    3. It is a bad rewrite of a standard pattern provided by the language, the range loop over a channel

    The range loop over a channel is very simply written

        for data := range c {
            fmt.Println(data)
        }
    

    This pattern has one great advantage, it automatically detect a closed channel to exit the loop! letting you loop over only the relevant data to process. It is also much more succint.

    Also, your worker is a awkward in that it read and write only one element before quitting. Spawning go routines is cheap, but not free. You should always evaluate the trade-off between the costs of async processing and its actual workload.

    Overall, your code should be closer to what is demonstrated here https://gobyexample.com/worker-pools