Search code examples
gochannelwaitgroup

How to efficiently parallelize array list and control the parallelism?


I have a resourceId array which I need loop in parallel. And generate URL for each resource and then put inside a map which is key (resourcId) and value is url.

I got below code which does the job but I am not sure if this is the right way to do it. I am using sizedwaitgroup here to parallelize the resourceId list. And also using lock on map while writing the data to it. I am sure this isn't efficient code as using lock and then using sizedwaitgroup will have some performance problem.

What is the best and efficient way to do this? Should I use channels here? I want to control the parallelism on how much I should have instead of running length of resourceId list. If any resourceId url generation fails, I want to log that as an error for that resourceId but do not disrupt other go routine running in parallel to get the url generated for other resourceId.

For example: If there are 10 resources, and 2 fails then log error for those 2 and map should have entry for remaining 8.

// running 20 threads in parallel
swg := sizedwaitgroup.New(20)
var mutex = &sync.Mutex{}
start := time.Now()
m := make(map[string]*customerPbV1.CustomerResponse)
for _, resources := range resourcesList {
  swg.Add()
  go func(resources string) {
    defer swg.Done()
    customerUrl, err := us.GenerateUrl(clientId, resources, appConfig)
    if err != nil {
      errs.NewWithCausef(err, "Could not generate the url for %s", resources)
    }
    mutex.Lock()
    m[resources] = customerUrl
    mutex.Unlock()
  }(resources)
}
swg.Wait()

elapsed := time.Since(start)
fmt.Println(elapsed)

Note: Above code will be called at high throughput from multiple reader threads so it needs to perform well.


Solution

  • I'm not sure what sizedwaitgroup is and it's not explained, but overall this approach doesn't look very typical of Go. For that matter, "best" is a matter of opinion, but the most typical approach in Go would be something along these lines:

    func main() {
        wg := new(sync.WaitGroup)
        start := time.Now()
        numWorkers := 20
        m := make(map[string]*customerPbV1.CustomerResponse)
        work := make(chan string)
        results := make(chan result)
        for i := 0; i < numWorkers; i++ {
            wg.Add(1)
            go worker(work, results)
        }
        go func() {
            for _, resources := range resourcesList {
                work <- resources
            }
            close(work)
        }()
    
        go func() {
            wg.Wait()
            close(results)
        }()
    
        for result := range results {
            m[result.resources] = result.response
        }
    
        elapsed := time.Since(start)
        fmt.Println(elapsed)
    }
    
    type result struct {
        resources string
        response  *customerPbV1.CustomerResponse
    }
    
    func worker(ch chan string, r chan result) {
        for w := range ch {
            customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
            if err != nil {
                errs.NewWithCausef(err, "Could not generate the url for %s", resources)
                continue
            }
            r <- result{w, customerUrl}
        }
    }
    

    (Though, based on the name, I would assume errs.NewWithCause doesn't actually handle errors, but returns one, in which case the current code is dropping them on the floor, and a proper solution would have an additional chan error for handling errors:

    func main() {
        wg := new(sync.WaitGroup)
        start := time.Now()
        numWorkers := 20
        m := make(map[string]*customerPbV1.CustomerResponse)
        work := make(chan string)
        results := make(chan result)
        errors := make(chan error)
        for i := 0; i < numWorkers; i++ {
            wg.Add(1)
            go worker(work, results, errors)
        }
    
        go func() {
            for _, resources := range resourcesList {
                work <- resources
            }
            close(work)
        }()
    
        go func() {
            wg.Wait()
            close(results)
            close(errors)
        }()
    
        go func() {
            for err := range errors {
                // Do something with err
            }
        }()
    
        for result := range results {
            m[result.resources] = result.response
        }
    
        elapsed := time.Since(start)
        fmt.Println(elapsed)
    }
    
    type result struct {
        resources string
        response  *customerPbV1.CustomerResponse
    }
    
    func worker(ch chan string, r chan result, errs chan error) {
        for w := range ch {
            customerUrl, err := us.GenerateUrl(clientId, w, appConfig)
            if err != nil {
                errs <- errs.NewWithCausef(err, "Could not generate the url for %s", resources)
                continue
            }
            r <- result{w, customerUrl}
        }
    }