Using Go, I have large log files. Currently I open them, create a new scanner bufio.NewScanner
, and then for scanner.Scan()
to loop through the lines. Each line is sent through a processing function, which matches it to regular expressions and extracts data. I would like to process this file in chunks simultaneously using goroutines. I believe this may be quicker than looping through the whole file sequentially.
It can take a few seconds per file, and I'm wondering if I can process a single file in, say, 10 pieces at a time. I believe I can sacrifice the memory if needed. I have ~3gb, and the biggest log file is maybe 75mb.
I see that a scanner
has a .Split()
method, where you can provide a custom split function, but I wasn't able to find a good solution using this method.
I've also tried creating a slice of slices, looping through the scanner with scanner.Scan()
and appending scanner.Text()
to each slice.
eg:
// pseudocode because I couldn't get this to work either
scanner := bufio.NewScanner(logInfo)
threads := [[], [], [], [], []]
i := 0
for scanner.Scan() {
i = i + 1
if i > 5 {
i = 0
}
threads[i] = append(threads[i], scanner.Text())
}
fmt.Println(threads)
I'm new to Go and concerned about efficiency and performance. I want to learn how to write good Go code! Any help or advice is really appreciated.
Peter gives a good starting point, if you wanted to do something like a fan-out, fan-in pattern you could do something like:
package main
import (
"bufio"
"fmt"
"log"
"os"
"sync"
)
func main() {
file, err := os.Open("/path/to/file.txt")
if err != nil {
log.Fatal(err)
}
defer file.Close()
lines := make(chan string)
// start four workers to do the heavy lifting
wc1 := startWorker(lines)
wc2 := startWorker(lines)
wc3 := startWorker(lines)
wc4 := startWorker(lines)
scanner := bufio.NewScanner(file)
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
merged := merge(wc1, wc2, wc3, wc4)
for line := range merged {
fmt.Println(line)
}
}
func startWorker(lines <-chan string) <-chan string {
finished := make(chan string)
go func() {
defer close(finished)
for line := range lines {
// Do your heavy work here
finished <- line
}
}()
return finished
}
func merge(cs ...<-chan string) <-chan string {
var wg sync.WaitGroup
out := make(chan string)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan string) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
}