Search code examples
goconcurrencygoroutine

Get responses from multiple go routines into an array


I need to fetch responses from multiple go routines and put them into an array. I know that channels could be used for this, however I am not sure how I can make sure that all go routines have finished processing the results. Thus I am using a waitgroup.

Code

func main() {
  log.Info("Collecting ints")
  var results []int32
  for _, broker := range e.BrokersByBrokerID {
      wg.Add(1)
      go getInt32(&wg)
  }
  wg.Wait()
  log.info("Collected")
}

func getInt32(wg *sync.WaitGroup) (int32, error) {
  defer wg.Done()

  // Just to show that this method may just return an error and no int32
  err := broker.Open(config)
  if err != nil && err != sarama.ErrAlreadyConnected {
    return 0, fmt.Errorf("Cannot connect to broker '%v': %s", broker.ID(), err)
  }
  defer broker.Close()

  return 1003, nil
}

My question

How can I put all the response int32 (which may return an error) into my int32 array, making sure that all go routines have finished their processing work and returned either the error or the int?


Solution

  • I also believe you have to use channel, it must be something like this:

    package main
    
    import (
        "fmt"
        "log"
        "sync"
    )
    
    var (
        BrokersByBrokerID = []int32{1, 2, 3}
    )
    
    type result struct {
        data string
        err string // you must use error type here
    }
    
    func main()  {
        var wg sync.WaitGroup
        var results []result
        ch := make(chan result)
    
        for _, broker := range BrokersByBrokerID {
            wg.Add(1)
            go getInt32(ch, &wg, broker)
        }
    
        go func() {
            for v := range ch {
                results = append(results, v)
            }
        }()
    
        wg.Wait()
        close(ch)
    
        log.Printf("collected %v", results)
    }
    
    func getInt32(ch chan result, wg *sync.WaitGroup, broker int32) {
        defer wg.Done()
    
        if broker == 1 {
            ch <- result{err: fmt.Sprintf("error: gor broker 1")}
            return
        }
    
        ch <- result{data: fmt.Sprintf("broker %d - ok", broker)}
    }
    

    Result will look like this:

    2019/02/05 15:26:28 collected [{broker 3 - ok } {broker 2 - ok } { error: gor broker 1}]