I am trying to load a big CSV file using goroutines using Golang. The dimension of the csv is (254882, 100). But using my goroutines when I am parsing the csv and storing it into an 2D list, I am getting rows lesser than 254882 and the number is varying for each run. I feel it is happening due goroutines but can't seem to point the reason. Can anyone please help me. I am also new in Golang. Here is my code below
func loadCSV(csvFile string) (*[][]float64, error) {
startTime := time.Now()
var dataset [][]float64
f, err := os.Open(csvFile)
if err != nil {
return &dataset, err
}
r := csv.NewReader(bufio.NewReader(f))
counter := 0
var wg sync.WaitGroup
for {
record, err := r.Read()
if err == io.EOF {
break
}
if counter != 0 {
wg.Add(1)
go func(r []string, dataset *[][]float64) {
var temp []float64
for _, each := range record {
f, err := strconv.ParseFloat(each, 64)
if err == nil {
temp = append(temp, f)
}
}
*dataset = append(*dataset, temp)
wg.Done()
}(record, &dataset)
}
counter++
}
wg.Wait()
duration := time.Now().Sub(startTime)
log.Printf("Loaded %d rows in %v seconds", counter, duration)
return &dataset, nil
}
And my main function looks like the following
func main() {
// runtime.GOMAXPROCS(4)
dataset, err := loadCSV("AvgW2V_train.csv")
if err != nil {
panic(err)
}
fmt.Println(len(*dataset))
}
If anyone needs to download the CSV too, then click the link below (485 MB) https://drive.google.com/file/d/1G4Nw6JyeC-i0R1exWp5BtRtGM1Fwyelm/view?usp=sharing
There is no need to use *[][]float64
as that would be a double pointer.
I have made some minor modifications to your program.
dataset
is available to new goroutine, since it's declared in it's above block of code.
Similarly record
is also available, but since record
variable, is changing from time to time, we need to pass it to new goroutine.
While there is no need to pass dataset
, as it is not changing and that is what we want, so that we can append temp to dataset
.
But race condition happens when multiple goroutines are trying to append to same variable, i.e., multiple goroutines are trying to write to same variable.
So we need to make sure that only one can goroutine can add at any instance of time. So we use a lock to make appending sequential.
package main
import (
"bufio"
"encoding/csv"
"fmt"
"os"
"strconv"
"sync"
)
func loadCSV(csvFile string) [][]float64 {
var dataset [][]float64
f, _ := os.Open(csvFile)
r := csv.NewReader(f)
var wg sync.WaitGroup
l := new(sync.Mutex) // lock
for record, err := r.Read(); err == nil; record, err = r.Read() {
wg.Add(1)
go func(record []string) {
defer wg.Done()
var temp []float64
for _, each := range record {
if f, err := strconv.ParseFloat(each, 64); err == nil {
temp = append(temp, f)
}
}
l.Lock() // lock before writing
dataset = append(dataset, temp) // write
l.Unlock() // unlock
}(record)
}
wg.Wait()
return dataset
}
func main() {
dataset := loadCSV("train.csv")
fmt.Println(len(dataset))
}
Some errors were not handled to make it minimal, but you should handle errors.