Search code examples
gogoroutine

Nested errgroup inside bunch of goroutines


I am fairly new to golang and its concurrency principles. My use-case involves performing multiple http requests(for a single entity), on batch of entities. If any of the http request fails for an entity, I need to stop all parallel http requests for it. Also, I have to manage counts of entities failed with errors. I am trying to implement errorgroup inside entities goroutines, such that if any http request fails for a single entity the errorgroup terminates and return error to its parent goroutine. But I am not sure how to maintain count of errors.

func main(entity[] string) {
    errorC := make(chan string) // channel to insert failed entity
    var wg sync.WaitGroup

    for _, link := range entity {
        wg.Add(1)
        // Spawn errorgroup here. errorgroup_spawn
    }

    go func() {
        wg.Wait()   
        close(errorC)    
    }()

    for msg := range errorC {
        // here storing error entityIds somewhere.
    }
}

and errorgroup like this

func errorgroup_spawn(ctx context.Context, errorC chan string, wg *sync.WaitGroup) { // and other params
    defer (*wg).Done()
    
   goRoutineCollection, ctxx := errgroup.WithContext(ctx)
    results := make(chan *result)
    goRoutineCollection.Go(func() error {
        // http calls for single entity
        // if error occurs, push it in errorC, and return Error.
        return nil
    })

    go func() {
        goRoutineCollection.Wait()
        close(result)
    }()

   return goRoutineCollection.Wait()
}

PS: I was also thinking to apply nested errorgroups, but can't think to maintain error counts, while running other errorgroups Can anyone guide me, is this a correct approach to handle such real world scenarios?


Solution

  • One way to keep track of errors is to use a status struct to keep track of which error came from where:

    type Status struct {
       Entity string
       Err error
    }
    ...
    
    errorC := make(chan Status) 
    
    // Spawn error groups with name of the entity, and when error happens, push Status{Entity:entityName,Err:err} to the chanel
    
    

    You can then read all errors from the error channel and figure out what failed why.

    Another option is not to use errorgroups at all. This makes things more explicit, but whether it is better or not is debatable:

    // Keep entity statuses
    statuses:=make([]Status,len(entity))
    for i, link := range entity {
       statuses[i].Entity=link
       wg.Add(1)
       go func(i index) {
          defer wg.Done()
          ctx, cancel:=context.WithCancel(context.Background())
          defer cancel()
    
          // Error collector
          status:=make(chan error)
          defer close(status)
          go func() {
             for st:=range status {
                 if st!=nil {
                    cancel()  // Stop all calls 
                    // store first error
                    if statuses[i].Err==nil {
                       statuses[i].Err=st
                    }
                 }
             }
          }()
    
          innerWg:=sync.WaitGroup{}
          innerWg.Add(1)
          go func() {
             defer innerWg.Done()
             status<- makeHttpCall(ctx)
          }()
          innerWg.Add(1)
          go func() {
             defer innerWg.Done()
             status<- makeHttpCall(ctx)
          }()
          ...
          innerWg.Wait()
    
       }(i)
    }
    

    When everything is done, statuses will contain all entities and corresponding statuses.