Search code examples
goconcurrencydelve

Golang WaitGroup.Done() being skipped


I've got a function that relies on checking some errors concurrently and I'm trying to use waitgroups to wait until all processes that return possible errors are done before I check all the errors.

It seems to be skipping some of the wg.Done() cals. Here's a youtube video of the debug (it loops through the 'for' loop 3 times, sorry): Golang Delve Debug for WaitGroups

Any idea why it'd be skipping some of the waitgroup.Done() calls?

Here's the code:

package controllers

import (
    "errors"
    "mobilebid/billable"
    db "mobilebid/database"
    "mobilebid/stripe"
    "net/http"
    "os"
    "strconv"
    "sync"
    "time"

    log "github.com/Sirupsen/logrus"
    "github.com/gorilla/mux"
)

var (
    errBillableID     = errors.New("It looks like there was an error while getting your billable ID. Do you have a credit card set up?")
    errWinningItems   = errors.New("It looks like there was an error while gathering your winning items. Please contact an event rep.")
    errAcctInfo       = errors.New("We had some trouble getting the account information for the event. Please contact an event rep.")
    errLoggingTrans   = errors.New("It looks like we had some sort of issue while logging your transaction. Please contact an event rep.")
    errParsingURL     = errors.New("We had some issue looking at the URL.")
    errStripeIssue    = errors.New("It looks like there was some kind of issue while talking with Stripe. If you were in the middle of a transaction, this doesn't mean the transaction was cancelled. Take a look at your transactions and/or contact an event rep.")
    errItemsPurchased = errors.New("One or more of the items you're trying to purchase have already been purchased. If this doesn't sound right, please contact an event rep.")
)

func createLogCtx(bidderID, eventID int) *log.Entry {
    return log.WithFields(log.Fields{
        "bidderID": bidderID,
        "eventID":  eventID,
    })
}

var wg sync.WaitGroup

const gorutineCt = 6

//PurchaseItems purchases items from the event for the bidder and sends the funds to the customer
//  In order for PurchaseItems to work:
//      1. Bidder must have a customer account set up in Stripe
//      2. Event owner needs to have their Stripe registered with the apps Stripe account
//      3. Item must not have been purchased before (ever)
func PurchaseItems(dB db.AppDB) http.HandlerFunc {
    return http.HandlerFunc(func(res http.ResponseWriter, req *http.Request) {

        ps := mux.Vars(req)

        eventID, err := strconv.Atoi(ps["eventID"])
        if err != nil {
            log.Error(err.Error())
            res.Write(ResErr(errParsingURL.Error()))
            return
        }

        bidderID, err := strconv.Atoi(ps["bidderID"])
        if err != nil {
            log.Error(err.Error())
            res.Write(ResErr(errParsingURL.Error()))
            return
        }

        itemsChan := make(chan []db.ItemWon)
        billableBidderIDChan := make(chan string)
        creditableAcctChan := make(chan string)
        errsChan := make(chan error, gorutineCt)
        wg.Add(gorutineCt)

        logCtx := createLogCtx(bidderID, eventID)

        acct := stripe.New(os.Getenv("SECRET_KEY"), os.Getenv("PUBLISHABLE_KEY"))

        go func() {
            id, e := dB.GetBidderBillableID(bidderID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errBillableID
                billableBidderIDChan <- id
            } else {
                errsChan <- nil
                billableBidderIDChan <- id
            }
            wg.Done()
        }()

        go func() {
            i, e := dB.GetWinningItemsForBidder(bidderID, eventID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errWinningItems
                itemsChan <- i
            } else {
                errsChan <- nil
                itemsChan <- i
            }
            wg.Done()
        }()

        go func() {
            a, e := dB.GetCreditableAccountFromEvent(eventID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errAcctInfo
                creditableAcctChan <- a
            } else {
                errsChan <- nil
                creditableAcctChan <- a
            }
            wg.Done()
        }()

        go func() {
            items := <-itemsChan
            for _, val := range items {
                e := dB.CheckIfItemPurchased(val.ItemID)
                if e != nil {
                    logCtx.WithFields(log.Fields{
                        "itemID":     val.ItemID,
                        "_timestamp": time.Now(),
                    }).Error(e.Error())
                    errsChan <- errItemsPurchased
                    itemsChan <- items
                    wg.Done()
                    return
                }
            }
            errsChan <- nil
            itemsChan <- items
            wg.Done() //SKIPPED
        }()

        go func() {
            billableBidderID := <-billableBidderIDChan
            e := acct.BuyerIsBillable(billableBidderID)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errStripeIssue
                billableBidderIDChan <- billableBidderID
            } else {
                errsChan <- nil
                billableBidderIDChan <- billableBidderID
            }
            wg.Done()
        }()

        go func() {
            creditableAcct := <-creditableAcctChan
            e := acct.CanReceiveFunds(creditableAcct)
            if e != nil {
                logCtx.Error(e.Error())
                errsChan <- errStripeIssue
                creditableAcctChan <- creditableAcct
            } else {
                errsChan <- nil
                creditableAcctChan <- creditableAcct
            }
            wg.Done()
        }()

        wg.Wait()
        close(errsChan)

        if err = checkConcurrentErrs(errsChan); err != nil {
            logCtx.Error(err.Error())
            res.Write(ResErr(err.Error()))
            return
        }

        items := <-itemsChan
        amount := addItems(items)
        appFee := calculateFee(amount, .03) //TODO: Store this somewhere where it can be edited without having to restart the app.

        invoice := billable.BillObject{
            Desc:     "Test Charge", //TODO: Generate this description from the event, items and bidder somehow.
            Amount:   amount,
            Currency: "usd",
            Dest:     <-creditableAcctChan,
            Fee:      appFee,
            Meta:     createItemsList(items),
            Customer: <-billableBidderIDChan,
        }

        trans, err := acct.ChargeBidder(invoice)
        if err != nil {
            logCtx.Error(err.Error())
            res.Write(ResErr(errStripeIssue.Error()))
            return
        }

        logCtx.WithFields(log.Fields{
            "stripeTransID": trans.TransID,
            "itemcCount":    len(items),
        }).Info("Transferred funds from bidder to client")

        dbTrans := db.Transaction{
            TransID:  trans.TransID,
            UserID:   5,
            BidderID: bidderID,
            EventID:  eventID,
            Amount:   int64(amount),
            AppFee:   int64(appFee),
            Desc:     "Some test order",
            Status:   "completed",
        }

        orderID, err := dB.InsertTransaction(dbTrans)
        if err != nil {
            logCtx.WithFields(log.Fields{
                "stripeTransID": dbTrans.TransID,
                "_timestamp":    time.Now(),
            }).Error(err.Error())
            res.Write(ResErr(errLoggingTrans.Error()))
            return
        }

        for it, val := range items {
            i := db.TransactionLine{
                OrderID: orderID,
                ItemID:  val.ItemID,
                Amount:  uint64(val.Bid * 100), //Must do this since the bid is in dollars but the amount is pennies
                Line:    it,
            }

            err := dB.InsertTransactionLine(i)
            if err != nil {
                logCtx.WithFields(log.Fields{
                    "stripeTransID": dbTrans.TransID,
                    "lineNumber":    i,
                    "_timestamp":    time.Now(),
                }).Error(err.Error())
                res.Write(ResErr(errLoggingTrans.Error()))
                return
            }
        }

        logCtx.WithField("orderID", orderID).Info("Order created")

        //TODO: Send receipt to buyer.
        res.Write(ResOK(trans.TransID))

    })
}

Solution

  • For the sake of posterity (and google searches):

    Put wg.Add(1) before each go func() line, instead of doing it in a single go with wg.Add(gorutineCt)

    Put defer wg.Done() at the start of each goroutine enclosure, instead of calling wg.Done() in every exit case. This ensures wg.Done() gets run regardless.

    Use a closer routine instead of trying to buffer a channel sufficiently:

    // start other goroutines
    
    go func () {
        wg.Wait()
        close(errschan)
    }
    
    for _, err := range errsChan { // automatically terminates once chan is closed
        if err != nil {
             // handle err
        }
    }