I have this code based on the pipelines example. walkFiles
takes one or more than one folder (as specified in the folders
variable) and "visits" the files in all folders given as a parameter. It also takes a done
channel to allow for cancellation, but I don't think it matters for this problem.
The code works as expected when passed only one folder to walk. But when given two it gives me the infamous fatal error: all goroutines are asleep - deadlock!
error. It even looks like it's doing the right thing by processing the files of the two folders, but it doesn't end well. What is the (probably obvious) error I'm making in the concurrency of this function?
Here's the code:
type result struct {
path string
checksum []byte
err error
}
type FileData struct {
Hash []byte
}
// walkFiles starts a goroutine to walk the directory tree at root and send the
// path of each regular file on the string channel. It sends the result of the
// walk on the error channel. If done is closed, walkFiles abandons its work.
func (p Processor) walkFiles(done <-chan struct{}, folders []string) (<-chan string, <-chan error) {
paths := make(chan string)
errc := make(chan error, 1)
visit := func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
}
if !info.Mode().IsRegular() {
return nil
}
select {
case paths <- path:
case <-done:
return errors.New("walk canceled")
}
return nil
}
var wg sync.WaitGroup
for i, folder := range folders {
wg.Add(1)
go func(f string, i int) {
defer wg.Done()
// No select needed for this send, since errc is buffered.
errc <- filepath.Walk(f, visit)
}(folder, i)
}
go func() {
wg.Wait()
close(paths)
}()
return paths, errc
}
func closeFile(f *os.File) {
err := f.Close()
if err != nil {
fmt.Fprintf(os.Stderr, "error: %v\n", err)
os.Exit(1)
}
}
// processor reads path names from paths and sends digests of the corresponding
// files on c until either paths or done is closed.
func (p Processor) process(done <-chan struct{}, files <-chan string, c chan<- result, loc *locator.Locator) {
for f := range files {
func() {
file, err := os.Open(f.path)
if err != nil {
fmt.Println(err)
return
}
defer closeFile(file)
// Hashing file, producing `checksum` variable, and an `err`
select {
case c <- result{f.path, checksum, err}:
case <-done:
return
}
}()
}
}
// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error. In that case,
// MD5All does not wait for inflight read operations to complete.
func (p Processor) MD5All(folders []string) (map[string]FileData, error) {
// MD5All closes the done channel when it returns; it may do so before
// receiving all the values from c and errc.
done := make(chan struct{})
defer close(done)
paths, errc := p.walkFiles(done, folders)
c := make(chan result)
var wg sync.WaitGroup
wg.Add(NUM_DIGESTERS)
for i := 0; i < NUM_DIGESTERS; i++ {
go func() {
p.process(done, paths, c, loc)
wg.Done()
}()
}
go func() {
wg.Wait()
close(c)
}()
// End of pipeline. OMIT
m := make(map[string]FileData)
for r := range c {
if r.err != nil {
return nil, r.err
}
m[r.path] = FileData{r.checksum}
}
if err := <-errc; err != nil {
return nil, err
}
return m, nil
}
func (p Processor) Start() map[string]FileData {
m, err := p.MD5All(p.folders)
if err != nil {
log.Fatal(err)
}
return m
}
The problem is here:
if err := <-errc; err != nil {
return nil, err
}
You're reading from the errc
only once, but all groutines are writing to it. Once the errc is read for the first completing goroutine, all others are stuck waiting to write to it.
Read using a for-loop.