Search code examples
goconcurrencygoroutine

Simpler Golang Concurrency Pattern


I've been trying to make a function that makes multiple external api calls concurrent with the following in mind

  • return the first error encountered
  • aggregate the results of the api call

Here is my function without concurrency

func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace([]*model.MicrosoftTeamsChannel, error) {
    allChannels := []*model.MicrosoftTeamsChannel{}
    teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)

    for _, teamGroup := range teamsGroups {
        channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
        if err != nil {
            return nil, err
        }
        allChannels = append(allChannels, channels...)
    }

    return allChannels, nil
}

My solution feels overkill. For one thing I kinda wished we could do away with the "crazy" for loop and the channel multiplexing thing with the select statement. I believe there is a simpler solution.

I read about concurrency patterns and tried to use one for this and produced the code below

func GetMicrosoftTeamsChannelSuggestions(workspace *model.Workspace) ([]*model.MicrosoftTeamsChannel, error) {
    allChannels := []*model.MicrosoftTeamsChannel{}
    teamsGroups := GetMicrosoftTeamsGroupsFromWorkspace(workspace)

    ch := make(chan []*model.MicrosoftTeamsChannel, len(teamsGroups))
    errCh := make(chan error)

    defer func() {
        close(ch)
        close(errCh)
    }()

    var wg sync.WaitGroup
    wg.Add(len(teamsGroups))

    for _, teamGroup := range teamsGroups {
        go func(teamGroup string) {
            defer wg.Done()

            channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
            if err != nil {
                errCh <- err
            } else {
                ch <- channels
            }
        }(teamGroup)
    }

    wg.Wait()

    for {
        select {
        case channels := <-ch:
            allChannels = append(allChannels, channels...)
        case err := <-errCh:
            return nil, err // Return the first encountered error
        default:
            return allChannels, nil
        }
    }
}

Solution

  • Your solution will create many goroutines in a for-loop, and then pull the results after all of them are terminated. Also, the for-loop that waits for the results has a default case in the select, causing it to busy-loop. Also, even if one of the calls fail, they are all executed.

    You can improve this solution. First, use a context to cancel, and a single channel to transfer both result and error:

    
    type Result struct {
        Result []*model.MicrosoftTeamsChannel
        Err error
    }
    
    ctx, cancel:=context.WithCancel(context.Background())
    ch:=make(chan Result) // No need for buffered channel
    

    Setup a reader goroutine:

    var err error
    go func() {
       for result:=range ch {
          if ch.Err!=nil {
             cancel()
             if err!=nil { // Record the first error
                 err=ch.Err
             }
          } else {
             allChannels = append(allChannels, result.Result...)
          }    
       }
    }()
    

    Then start the goroutines:

    var wg sync.WaitGroup
    wg.Add(len(teamsGroups))
    for _, teamGroup := range teamsGroups {
        go func(teamGroup string) {
            defer wg.Done()
            // Stop if canceled
            if ctx.Err()!=nil {
               return
            }
            channels, err := GetMicrosoftTeamsChannels(*workspace.MicrosoftTeamsTenantId, teamGroup)
             ch<-Result{Result: channels, Err:err}
            }(teamGroup)
     }
    

    Wait for them to end:

    wg.Wait()
    

    Close the channel so the reader can terminate:

    close(ch)