goconcurrencychannel

What's the best practice in Golang to track the done status of two goroutines in a third goroutine?


I have three goroutines running concurrently. Two of these do some processing and send their results to a results channel. The third goroutine "tallies" results by reading the results channel. I could use a waitgroup to wait for the two computational goroutines to finish, then range over the results channel to tally the results, but this wouldn't scale and would require me to create a buffered results channel with a massive buffer size which is unacceptable in production code.

I want to tally the results AS the processing occurs, but I don't want to exit the program until all tallying is complete. What is the best practice for achieving this in Go?

Here is my current approach that works just fine. I'm wondering whether there is a better way, since this seems a little clunky?

package main

import (
    "fmt"
    "sync"
)

type T struct{}

func main() {
    var widgetInventory int = 1000
    transactions := make(chan int, 100)
    salesDone := make(chan T)
    purchasesDone := make(chan T)
    var wg sync.WaitGroup
    fmt.Println("Starting inventory count = ", widgetInventory)

    go makeSales(transactions, salesDone)
    go newPurchases(transactions, purchasesDone)

    wg.Add(1)

    go func() {
        salesAreDone := false
        purchasesAreDone := false

        for {
            select {
            case transaction := <-transactions:
                widgetInventory += transaction
            case <-salesDone:
                salesAreDone = true
            case <-purchasesDone:
                purchasesAreDone = true
            default:
                if salesAreDone && purchasesAreDone {
                    wg.Done()
                    return
                }
            }
        }
    }()

    wg.Wait()
    fmt.Println("Ending inventory count = ", widgetInventory)
}

func makeSales(transactions chan int, salesDone chan T) {
    for i := 0; i < 3000; i++ {
        transactions <- -100
    }

    salesDone <- struct{}{}
}

func newPurchases(transactions chan int, purchasesDone chan T) {
    for i := 0; i < 3000; i++ {
        transactions <- 100
    }

    purchasesDone <- struct{}{}
}

Solution

  • Here is my current approach that works just fine

    Not for any reasonable definition of just fine. You have a hot for loop here:

    for {
                select {
                case transaction := <-transactions:
                    widgetInventory += transaction
                case <-salesDone:
                    salesAreDone = true
                case <-purchasesDone:
                    purchasesAreDone = true
                default:
                    if salesAreDone && purchasesAreDone {
                        wg.Done()
                        return
                    }
                }
            }
    

    The default case executes whenever no channels are ready to read. Because of the way channels work, this happens quite a lot.

    This slightly adjusted version of your code illustrates the "heat" of this loop. Exact results will vary, could be quite high.

    Default case ran 27305 times
    

    You don't want a default case when selecting from channels unless that default case will also block on something inside it. Otherwise you get a hot loop like this.

    A better way: select with nilable channels

    Generally in a select, you want to identify closed channels and set the channel variable to nil; select will never successfully read from a nil channel so this essentially "disables" that selection.

    Consider this modified version of your code:

        go func(transactions chan int, salesDone <-chan T, purchasesDone <-chan T) {
            defer wg.Done()
            for transactions != nil {
                select {
                case transaction, ok := <-transactions:
                    if ok {
                        widgetInventory += transaction
                    } else {
                        transactions = nil
                    }
                case <-salesDone:
                    salesDone = nil
                    if purchasesDone == nil {
                        close(transactions)
                    }
                case <-purchasesDone:
                    purchasesDone = nil
                    if salesDone == nil {
                        close(transactions)
                    }
    
                }
            }
        }(transactions, salesDone, purchasesDone)
    

    With these adjustments to the consumer, we don't have a hot loop; we always block until we read from a channel. Once both salesDone and purchasesDone have been "signaled", we close(transactions). Once we have drained transactions and it is closed, we set transactions to nil. We loop while transactions is not nil, which in this code, implies all channels are nil.

    A subtle but important point: I pass the channels to this function so its references do not share scope with main. Otherwise, setting transactions to nil would be writing a variable that's shared across goroutines. However in this case, it doesn't matter anyway because we "know" we're the last one to read from transactions.

    A simpler option: multiple wait groups

    If you think about what you're doing here, you want to wait until both producers are done producing to transactions. Then you want to drain transactions. Once the channel is closed and drained, main knows the summation is complete.

    You don't need select to do that. And select with a case for each "worker", so to speak, is quite inelegant; you have to hard code a number of workers and individually handle the "Done" channels.

    All you need to do is:

    • Add a var resultswg sync.WaitGroup for the consumer, in addition to using one for the producers.
    • the producers defer wg.Done()
    • the consumer defer resultswg.Done() before ranging over transactions:
      go func() {
          defer resultswg.Done()
          for transaction := range transactions {
              widgetInventory += transaction
          }
      }()
      
    • main handles waiting on the producers, closing transactions to end the range, and then waiting on the consumer:
      wg.Wait()
      close(transactions)
      resultswg.Wait()
      

    It ends up being short and sweet when coded this way:

    package main
    
    import (
        "fmt"
        "sync"
    )
    
    func main() {
        var widgetInventory int = 1000
        transactions := make(chan int, 100)
    
        var wg, resultswg sync.WaitGroup
        fmt.Println("Starting inventory count = ", widgetInventory)
        wg.Add(2)
    
        go makeSales(transactions, &wg)
        go newPurchases(transactions, &wg)
        resultswg.Add(1)
        go func() {
            defer resultswg.Done()
            for transaction := range transactions {
                widgetInventory += transaction
            }
        }()
    
        wg.Wait()
        close(transactions)
        resultswg.Wait()
        fmt.Println("Ending inventory count = ", widgetInventory)
    }
    
    func makeSales(transactions chan int, wg *sync.WaitGroup) {
        defer wg.Done()
        for i := 0; i < 3000; i++ {
            transactions <- -100
        }
    
    }
    
    func newPurchases(transactions chan int, wg *sync.WaitGroup) {
        defer wg.Done()
        for i := 0; i < 3000; i++ {
            transactions <- 100
        }
    
    }
    

    You can see here that you could have any number of producers in this pattern; you just have to wg.Add(1) for each producer.

    I use this pattern all the time to parallelize work when I don't know how many results will be returned from each worker. I find it to be easy to comprehend and a lot simpler than trying to select multiple channels. In fact, I'd go so far as to say that if you find yourself selecting from multiple channels, you should step back and make sure it really makes sense for you. I use select far less often than I use wait groups.