Search code examples
gogoroutine

Program goes into deadlock using waitgroup


I'm writing a program that reads a list of order numbers in a file called orders.csv and compares it with the other csv files that are present in the folder.

The problem is that it goes into deadlock even using waitgroup and I don't know why.

For some reason stackoverflow says that my post is mostly code, so I have to add this line, because the whole code is necessary if someone wants to help me debug this problem I'm having.

package main

import (
    "bufio"
    "fmt"
    "log"
    "os"
    "path/filepath"
    "strings"
    "sync"
)

type Files struct {
    filenames []string
}

type Orders struct {
    ID []string
}

var ordersFilename string = "orders.csv"

func main() {
    var (
        ordersFile *os.File
        files       Files
        orders     Orders
        err        error
    )

    mu := new(sync.Mutex)
    wg := &sync.WaitGroup{}
    wg.Add(1)

    if ordersFile, err = os.Open(ordersFilename); err != nil {
        log.Fatalln("Could not open file: " + ordersFilename)
    }

    orders = getOrderIDs(ordersFile)

    files.filenames = getCSVsFromCurrentDir()

    var filenamesSize = len(files.filenames)
    var ch = make(chan map[string][]string, filenamesSize)
    var done = make(chan bool)

    for i, filename := range files.filenames {
        go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, wg *sync.WaitGroup, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
            wg.Add(1)
            defer wg.Done()
            checkFile(currentFilename, orders, ch)
            mu.Lock()
            *filenamesSize--
            mu.Unlock()
            if i == *filenamesSize {
                done <- true
                close(done)
            }
        }(filename, ch, i, orders, wg, &filenamesSize, mu, done)
    }

    select {
    case str := <-ch:
        fmt.Printf("%+v\n", str)
    case <-done:
        wg.Done()
        break
    }

    wg.Wait()
    close(ch)
}

// getCSVsFromCurrentDir returns a string slice
// with the filenames of csv files inside the
// current directory that are not "orders.csv"
func getCSVsFromCurrentDir() []string {
    var filenames []string

    err := filepath.Walk(".", func(path string, info os.FileInfo, err error) error {
        if path != "." && strings.HasSuffix(path, ".csv") && path != ordersFilename {
            filenames = append(filenames, path)
        }

        return nil
    })

    if err != nil {
        log.Fatalln("Could not read file names in current dir")
    }

    return filenames
}

// getOrderIDs returns an Orders struct filled
// with order IDs retrieved from the file
func getOrderIDs(file *os.File) Orders {
    var (
        orders      Orders
        err         error
        fileContent string
    )

    reader := bufio.NewReader(file)

    if fileContent, err = readLine(reader); err != nil {
        log.Fatalln("Could not read file: " + ordersFilename)
    }

    for err == nil {
        orders.ID = append(orders.ID, fileContent)
        fileContent, err = readLine(reader)
    }

    return orders
}

func checkFile(filename string, orders Orders, ch chan<- map[string][]string) {
    var (
        err           error
        file          *os.File
        fileContent   string
        orderFilesMap map[string][]string
        counter       int
    )

    orderFilesMap = make(map[string][]string)

    if file, err = os.Open(filename); err != nil {
        log.Fatalln("Could not read file: " + filename)
    }

    reader := bufio.NewReader(file)

    if fileContent, err = readLine(reader); err != nil {
        log.Fatalln("Could not read file: " + filename)
    }

    for err == nil {
        if containedInSlice(fileContent, orders.ID) && !containedInSlice(fileContent, orderFilesMap[filename]) {
            orderFilesMap[filename] = append(orderFilesMap[filename], fileContent)
            // fmt.Println("Found: ", fileContent, " in ", filename)
        } else {
            // fmt.Printf("Could not find: '%s' in '%s'\n", fileContent, filename)
        }
        counter++
        fileContent, err = readLine(reader)
    }

    ch <- orderFilesMap
}

// containedInSlice returns true or false
// based on whether the string is contained
// in the slice
func containedInSlice(str string, slice []string) bool {
    for _, ID := range slice {
        if ID == str {
            return true
        }
    }

    return false
}

// readLine returns a line from the passed reader
func readLine(r *bufio.Reader) (string, error) {
    var (
        isPrefix bool  = true
        err      error = nil
        line, ln []byte
    )
    for isPrefix && err == nil {
        line, isPrefix, err = r.ReadLine()
        ln = append(ln, line...)
    }
    return string(ln), err
}

Solution

    1. The first issue is the wg.Add always must be outside of the goroutine(s) it stands for. If it isn't, the wg.Wait call might be called before the goutine(s) have actually started running (and called wg.Add) and therefore will "think" that there is nothing to wait for.

    2. The second issue with the code is that there are multiple ways it waits for the routines to be done. There is the WaitGroup and there is the done channel. Use only one of them. Which one depends also on how the results of the goroutines are used. Here we come to the next problem.

    3. The third issue is with gathering the results. Currently the code only prints / uses a single result from the goroutines. Put a for { ... } loop around the select and use return to break out of the loop if the done channel is closed. (Note that you don't need to send anything on the done channel, closing it is enough.)

    Improved Version 0.0.1

    So here the first version (including some other "code cleanup") with a done channel used for closing and the WaitGroup removed:

    func main() {
        ordersFile, err := os.Open(ordersFilename)
        if err != nil {
            log.Fatalln("Could not open file: " + ordersFilename)
        }
    
        orders := getOrderIDs(ordersFile)
    
        files := Files{
            filenames: getCSVsFromCurrentDir(),
        }
    
        var (
            mu = new(sync.Mutex)
            filenamesSize = len(files.filenames)
            ch = make(chan map[string][]string, filenamesSize)
            done = make(chan bool)
        )
    
        for i, filename := range files.filenames {
            go func(currentFilename string, ch chan<- map[string][]string, i int, orders Orders, filenamesSize *int, mu *sync.Mutex, done chan<- bool) {
                checkFile(currentFilename, orders, ch)
                mu.Lock()
                *filenamesSize--
                mu.Unlock()
                // TODO: This also accesses filenamesSize, so it also needs to be protected with the mutex:
                if i == *filenamesSize {
                    done <- true
                    close(done)
                }
            }(filename, ch, i, orders, &filenamesSize, mu, done)
        }
    
        // Note: closing a channel is not really needed, so you can omit this:
        defer close(ch)
        for {
            select {
            case str := <-ch:
                fmt.Printf("%+v\n", str)
            case <-done:
                return
            }
        }
    }
    

    Improved Version 0.0.2

    1. In your case we have some advantage however. We know exactly how many goroutines we started and therefore also how many results we expect. (Of course if each goroutine returns a result which currently this code does.) That gives us another option as we can collect the results with another for loop having the same amount of iterations:
    func main() {
        ordersFile, err := os.Open(ordersFilename)
        if err != nil {
            log.Fatalln("Could not open file: " + ordersFilename)
        }
    
        orders := getOrderIDs(ordersFile)
    
        files := Files{
            filenames: getCSVsFromCurrentDir(),
        }
    
        var (
            // Note: a buffered channel helps speed things up. The size does not need to match the size of the items that will
            //   be passed through the channel. A fixed, small size is perfect here.
            ch = make(chan map[string][]string, 5)
        )
    
        for _, filename := range files.filenames {
            go func(filename string) {
                // orders and channel are not variables of the loop and can be used without copying
                checkFile(filename, orders, ch)
            }(filename)
        }
    
        for range files.filenames {
            str := <-ch
            fmt.Printf("%+v\n", str)
        }
    }
    

    A lot simpler, isn't it? Hope that helps!