Search code examples
gochannelgoroutine

Doesn't receive a message from a channel


Edit:

After I added a small part of file I was using (7 GB) and tried to run the program, I could see this:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
main.main()
    /media/developer/golang/manual/examples/sp/v2/sp.v2.go:71 +0x4a9
exit status 2

Situation:

I'm completely new to GO, so I'm sorry if my question is really simple.

I am trying to stream xml file, split documents and then parse them in different GO Routines.

Example of XML file I'm using:

<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="CGImap 0.0.2">
    <relation id="56688" user="kmvar" uid="56190" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="node" ref="364933006" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="network" v="VVW"/>
        <tag k="route" v="bus"/>
        <tag k="type" v="route"/>
    </relation>
    <relation id="98367" user="jdifh" uid="92834" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="way" ref="4579143" role=""/>
        <member type="node" ref="249673494" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="network" v="VVW"/>
        <tag k="operator" v="Regionalverkehr Küste"/>
        <tag k="ref" v="123"/>
    </relation>
    <relation id="72947" user="schsu" uid="92374" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <tag k="name" v="Küstenbus Linie 123"/>
        <tag k="type" v="route"/>
    </relation>
    <relation id="93742" user="doiff" uid="61731" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
        <member type="node" ref="294942404" role=""/>
        <member type="node" ref="364933006" role=""/>
        <tag k="route" v="bus"/>
        <tag k="type" v="route"/>
    </relation>
</osm>

I have this snippet of code:

package main

import (
  "encoding/xml"
  "bufio"
  "fmt"
  "os"
  "io"
)

type RS struct {
  I string `xml:"id,attr"`
  M []struct {
    I string `xml:"ref,attr"`
    T string `xml:"type,attr"`
    R string `xml:"role,attr"`
  } `xml:"member"`
  T []struct {
    K string `xml:"k,attr"`
    V string `xml:"v,attr"`
  } `xml:"tag"`
}

func main() {
  p1D, err := os.Open("/media/developer/Transcend/osm/xml/relations.xml")

  if err != nil {
    fmt.Println(err)
    os.Exit(1)
  }

  defer p1D.Close()

  reader := bufio.NewReader(p1D)

  var count int32
  var element string

  channel := make(chan RS) // channel

  for {
    p2Rc, err := reader.ReadSlice('\n')
    if err != nil {
      if err == io.EOF {
        break
      } else {
        fmt.Println(err)
        os.Exit(1)
      }
    }

    var p2Rs = string(p2Rc)

    if p2Rc[2] == 114 {
      count++

      if (count != 1) {
        go parseRelation(element, channel)
      }

      element = ""
      element += p2Rs
    } else {
      element += p2Rs
    }
  }

  for i := 0; i < 5973974; i++ {
    fmt.Println(<- channel)
  }
}

func parseRelation(p1E string, channel chan RS) {
  var rs RS
  xml.Unmarshal([]byte(p1E), &rs)

  channel <- rs
}

It is supposed to print each struct, but I see nothing. The program just hangs.

I tested streamer and splitter (just added fmt.Println(rs) in function parseRelation before sending message into the channel). I could see structs. So, the problems are in sending and receiving messages.

Problem:

I have no idea how to solve this issue. Tried changing the type of the messages in the channel (from RS to string) and sending just one string every time. But it also didn't help (I could see nothing)


Solution

  • First, let's get this out of the way: you can't parse XML line by line. You're lucky that your file happens to be one tag per line, but that can't be taken for granted. You must parse the whole XML document.

    By processing line by line you're trying to shove <tag> and <member> into a struct designed for <relation>. Instead, use xml.NewDecoder and let that process the file for you.

    package main
    
    import (
        "encoding/xml"
        "fmt"
        "os"
        "log"
    )
    
    type Osm struct {
        XMLName     xml.Name    `xml:"osm"`
        Relations   []Relation  `xml:"relation"`
    }
    type Relation struct {
        XMLName     xml.Name    `xml:"relation"`
        ID          string      `xml:"id,attr"`
        User        string      `xml:"user,attr"`
        Uid         string      `xml:"uid,attr"`
        Members     []Member    `xml:"member"`
        Tags        []Tag       `xml:"tag"`
    }
    type Member struct {
        XMLName     xml.Name    `xml:"member"`
        Ref         string      `xml:"ref,attr"`
        Type        string      `xml:"type,attr"`
        Role        string      `xml:"role,attr"`
    }
    type Tag struct {
        XMLName     xml.Name    `xml:"tag"`
        Key         string      `xml:"k,attr"`
        Value       string      `xml:"v,attr"`
    }
    
    func main() {
        reader, err := os.Open("test.xml")
        if err != nil {
            log.Fatal(err)
        }
        defer reader.Close()
    
        decoder := xml.NewDecoder(reader)
    
        osm := &Osm{}
        err = decoder.Decode(&osm)
        if err != nil {
            log.Fatal(err)
        }
        fmt.Println(osm)
    }
    

    Osm and the other structs are analogous to the XML schema you expect. decoder.Decode(&osm) applies that schema.

    If you just want to extract part of the XML, see the answers to How to extract part of an XML file as a string?.

    The rest of the answer will cover just the use of channels and goroutines. The XML part will be dropped.


    If you add a few debugging statements you find that parseRelation is never called which means the channel is empty and fmt.Println(<- channel) sits around waiting for an empty channel which is never closed. So once you're done processing, close the channel.

      for {
        p2Rc, err := reader.ReadSlice('\n')
    
        ...
      }
      close(channel)
    

    Now we get { [] []} 5973974 times.

    for i := 0; i < 5973974; i++ {
      fmt.Println(<- channel)
    }
    

    That's trying to read from the channel 5973974 times. That defeats the point of channels. Instead, read from the channel using range.

    for thing := range channel {
        fmt.Println(thing)
    }
    

    Now at least it finishes!

    But there's a new problem. If it actually finds a thing, like if you change if p2Rc[2] == 114 { to if p2Rc[2] == 32 {, you'll get a panic: send on closed channel. This is because parseRelation is running in parallel to the reader and might try to write after the main reading code is finished and has closed the channel. You have to make sure everyone using the channel is done before closing it.

    To fix this requires a fairly major redesign.


    Here's an example of a simple program that reads lines from a file, puts them into a channel, and has a worker read from that channel.

    func main() {
        reader, err := os.Open("test.xml")
        if err != nil {
            log.Fatal(err)
        }
        defer reader.Close()
    
        // Use the simpler bufio.Scanner
        scanner := bufio.NewScanner(reader)
    
        // A buffered channel for input
        lines := make(chan string, 10)
    
        // Work on the lines
        go func() {
            for line := range lines {
                fmt.Println(line)
            }
        }()
    
        // Read lines into the channel
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    
        // When main exits, channels gracefully close.
    }
    

    This works fine because main is special and cleans up channels when it exits. But what if the reader and writer were both goroutines?

    // A buffered channel for input
    lines := make(chan string, 10)
    
    // Work on the lines
    go func() {
        for line := range lines {
            fmt.Println(line)
        }
    }()
    
    // Read lines into the channel
    go func() {
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()
    

    Empty. main exits and shuts down the channel before the goroutines can do their job. We need a way to let main know to wait until processing is finished. There's a couple ways to do this. One is with another channel to synchronize processing.

    // A buffered channel for input
    lines := make(chan string, 10)
    
    // A channel for main to wait for
    done := make(chan bool, 1)
    
    // Work on the lines
    go func() {
        for line := range lines {
            fmt.Println(line)
        }
    
        // Indicate the worker is done
        done <- true
    }()
    
    // Read lines into the channel
    go func() {
        // Explicitly close `lines` when we're done so the workers can finish
        defer close(lines)
    
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()
    
    // main will wait until there's something to read from `done`
    <-done
    

    Now main will fire off the reader and worker goroutines and buffer waiting for something on done. The reader will fill lines until its done reading and then close it. In parallel the worker will read from lines and write to done once its done reading.

    The other option is to use sync.WaitGroup.

    // A buffered channel for input
    lines := make(chan string, 10)
    
    var wg sync.WaitGroup
    
    // Note that there is one more thing to wait for
    wg.Add(1)
    go func() {
        // Tell the WaitGroup we're done
        defer wg.Done()
    
        for line := range lines {
            fmt.Println(line)
        }
    }()
    
    // Read lines into the channel
    go func() {
        defer close(lines)
    
        for scanner.Scan() {
            lines <- scanner.Text()
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()
    
    // Wait until everything in the WaitGroup is done
    wg.Wait()
    

    As before, main launches the reader and worker goroutines, but now it adds 1 to a WaitGroup just before launching the worker. Then it waits until wg.Wait() returns. The reader works the same as before, closing the lines channel when its finished. The worker now calls wg.Done() when its finished decrementing the WaitGroup's count and allowing wg.Wait() to return.

    Each technique has advantages and disadvantages. done is more flexible, it chains better, and can be safer if you can wrap your head around it. WaitGroups are simpler and easier to wrap your head around, but require that every goroutine share a variable.


    If we wanted to add to this chain of processing, we can do so. Say we have a goroutine that reads the lines, one that processes them in XML elements, and one that does something with the elements.

    // A buffered channel for input
    lines := make(chan []byte, 10)
    elements := make(chan *RS)
    
    var wg sync.WaitGroup
    
    // Worker goroutine, turn lines into RS structs
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer close(elements)
    
        for line := range lines {
            if line[2] == 32 {
                fmt.Println("Begin")
                fmt.Println(string(line))
                fmt.Println("End")
    
                rs := &RS{}
                xml.Unmarshal(line, &rs)
                elements <- rs
            }
        }
    }()
    
    // File reader
    go func() {
        defer close(lines)
    
        for scanner.Scan() {
            lines <- scanner.Bytes()
        }
        if err := scanner.Err(); err != nil {
            log.Fatal(err)
        }
    }()
    
    // Element reader
    wg.Add(1)
    go func() {
        defer wg.Done()
    
        for element := range elements {
            fmt.Println(element)
        }
    }()
    
    wg.Wait()
    

    This produces empty structs because you're trying to shove individual lines of XML into a struct representing the complete <relationship> tag. But it demonstrates how you can add more workers to the chain.