Search code examples
gofile-iogoroutine

Reading a file line-by-line with concurrency


What I Want To Do

In GetLine, I am trying to parse a file line-by-line using bufio.Scanner and a naive attempt at concurrency. Following fetching the text in each line, I am sending it via a channel of string to the caller(main function). Along with the value, I am also sending errors and completion flag(via done channel). Thus, this should be able to fetch a new line to process in a separate goroutine while the current line is processed.

What I Have Actually Done

var READCOMPLETE = errors.New("Completed Reading")

func main() {

    filename := flag.String("filename", "", "The file to parse")
    flag.Parse()

    if *filename == "" {
        log.Fatal("Provide a file to parse")
    }

    fmt.Println("Getting file")

    names := make(chan string)
    readerr := make(chan error)
    done := make(chan bool)

    go GetLine(*filename, names, readerr, done)

    for {
        select {
        case name := <-names:
            // Process each line
            fmt.Println(name)

        case err := <-readerr:
            log.Fatal(err)

        case <-done:
            // close(names)
            // close(readerr)
            break
        }
    }

    fmt.Println("Processing Complete")
}

func GetLine(filename string, names chan string, readerr chan error, done chan bool) {
    file, err := os.Open(filename)
    if err != nil {
        log.Fatal(err)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        names <- scanner.Text()
        //fmt.Println(scanner.Text())
    }

    if err := scanner.Err(); err != nil {
        readerr <- err
    }

    done <- true
}

What I Get on Running

Runtime Error: fatal error: all goroutines are asleep - deadlock!

What have I Tried to Fix?

After reading this answer about the error message, I tried closing the channels names and readerr in the last clause of the select statement as shown in the comments. However, the program still crashes with a log message. I am unable to fix it further and would appreciate any help.
Resources for learning are welcome.

P.S: I am relatively new to GoLang and still learning how to work with the CSP model of concurrency in Go. Infact, this is my first attempt at writing a synchronous concurrent program.


Solution

  • The break statement in a select breaks out of the select. The application must break out of the for loop when done. Use a label to break out of the for loop:

    loop:
        for {
            select {
            case name := <-names:
                // Process each line
                fmt.Println(name)
    
            case err := <-readerr:
                log.Fatal(err)
    
            case <-done:
                // close(names)
                // close(readerr)
                break loop
            }
        }
    

    The code can be simplified by eliminating the done channel.

    func main() {
    
        filename := flag.String("filename", "", "The file to parse")
        flag.Parse()
    
        if *filename == "" {
            log.Fatal("Provide a file to parse")
        }
    
        fmt.Println("Getting file")
    
        names := make(chan string)
        readerr := make(chan error)
    
        go GetLine(*filename, names, readerr)
    
    loop:
        for {
            select {
            case name := <-names:
                // Process each line
                fmt.Println(name)
    
            case err := <-readerr:
                if err != nil {
                    log.Fatal(err)
                }
                break loop
            }
        }
    
        fmt.Println("Processing Complete")
    }
    
    func GetLine(filename string, names chan string, readerr chan error) {
        file, err := os.Open(filename)
        if err != nil {
            log.Fatal(err)
        }
        defer file.Close()
    
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            names <- scanner.Text()
        }
        readerr <- scanner.Err()
    }
    

    In this specific example, the code can be restructured to separate receiving names from receiving the error.

    func main() {
        filename := flag.String("filename", "", "The file to parse")
        flag.Parse()
    
        if *filename == "" {
            log.Fatal("Provide a file to parse")
        }
    
        fmt.Println("Getting file")
    
        names := make(chan string)
        readerr := make(chan error)
    
        go GetLine(*filename, names, readerr)
    
        for name := range names {
            fmt.Println(name)
        }
        if err := <-readerr; err != nil {
            log.Fatal(err)
        }
    
        fmt.Println("Processing Complete")
    }
    
    func GetLine(filename string, names chan string, readerr chan error) {
        file, err := os.Open(filename)
        if err != nil {
            log.Fatal(err)
        }
        defer file.Close()
    
        scanner := bufio.NewScanner(file)
        for scanner.Scan() {
            names <- scanner.Text()
        }
        close(names) // close causes range on channel to break out of loop
        readerr <- scanner.Err()
    }