After I added a small part of file I was using (7 GB) and tried to run the program, I could see this:
fatal error: all goroutines are asleep - deadlock!
goroutine 1 [chan receive]:
main.main()
/media/developer/golang/manual/examples/sp/v2/sp.v2.go:71 +0x4a9
exit status 2
I'm completely new to GO, so I'm sorry if my question is really simple.
I am trying to stream xml
file, split documents and then parse them in different GO Routines.
Example of XML file I'm using:
<?xml version="1.0" encoding="UTF-8"?>
<osm version="0.6" generator="CGImap 0.0.2">
<relation id="56688" user="kmvar" uid="56190" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
<member type="node" ref="294942404" role=""/>
<member type="node" ref="364933006" role=""/>
<tag k="name" v="Küstenbus Linie 123"/>
<tag k="network" v="VVW"/>
<tag k="route" v="bus"/>
<tag k="type" v="route"/>
</relation>
<relation id="98367" user="jdifh" uid="92834" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
<member type="node" ref="294942404" role=""/>
<member type="way" ref="4579143" role=""/>
<member type="node" ref="249673494" role=""/>
<tag k="name" v="Küstenbus Linie 123"/>
<tag k="network" v="VVW"/>
<tag k="operator" v="Regionalverkehr Küste"/>
<tag k="ref" v="123"/>
</relation>
<relation id="72947" user="schsu" uid="92374" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
<member type="node" ref="294942404" role=""/>
<tag k="name" v="Küstenbus Linie 123"/>
<tag k="type" v="route"/>
</relation>
<relation id="93742" user="doiff" uid="61731" visible="true" version="28" changeset="6947637" timestamp="2011-01-12T14:23:49Z">
<member type="node" ref="294942404" role=""/>
<member type="node" ref="364933006" role=""/>
<tag k="route" v="bus"/>
<tag k="type" v="route"/>
</relation>
</osm>
I have this snippet of code:
package main
import (
"encoding/xml"
"bufio"
"fmt"
"os"
"io"
)
type RS struct {
I string `xml:"id,attr"`
M []struct {
I string `xml:"ref,attr"`
T string `xml:"type,attr"`
R string `xml:"role,attr"`
} `xml:"member"`
T []struct {
K string `xml:"k,attr"`
V string `xml:"v,attr"`
} `xml:"tag"`
}
func main() {
p1D, err := os.Open("/media/developer/Transcend/osm/xml/relations.xml")
if err != nil {
fmt.Println(err)
os.Exit(1)
}
defer p1D.Close()
reader := bufio.NewReader(p1D)
var count int32
var element string
channel := make(chan RS) // channel
for {
p2Rc, err := reader.ReadSlice('\n')
if err != nil {
if err == io.EOF {
break
} else {
fmt.Println(err)
os.Exit(1)
}
}
var p2Rs = string(p2Rc)
if p2Rc[2] == 114 {
count++
if (count != 1) {
go parseRelation(element, channel)
}
element = ""
element += p2Rs
} else {
element += p2Rs
}
}
for i := 0; i < 5973974; i++ {
fmt.Println(<- channel)
}
}
func parseRelation(p1E string, channel chan RS) {
var rs RS
xml.Unmarshal([]byte(p1E), &rs)
channel <- rs
}
It is supposed to print each struct, but I see nothing. The program just hangs.
I tested streamer and splitter (just added fmt.Println(rs)
in function parseRelation before sending message into the channel). I could see structs. So, the problems are in sending and receiving messages.
I have no idea how to solve this issue. Tried changing the type of the messages in the channel (from RS
to string
) and sending just one string every time. But it also didn't help (I could see nothing)
First, let's get this out of the way: you can't parse XML line by line. You're lucky that your file happens to be one tag per line, but that can't be taken for granted. You must parse the whole XML document.
By processing line by line you're trying to shove <tag>
and <member>
into a struct designed for <relation>
. Instead, use xml.NewDecoder
and let that process the file for you.
package main
import (
"encoding/xml"
"fmt"
"os"
"log"
)
type Osm struct {
XMLName xml.Name `xml:"osm"`
Relations []Relation `xml:"relation"`
}
type Relation struct {
XMLName xml.Name `xml:"relation"`
ID string `xml:"id,attr"`
User string `xml:"user,attr"`
Uid string `xml:"uid,attr"`
Members []Member `xml:"member"`
Tags []Tag `xml:"tag"`
}
type Member struct {
XMLName xml.Name `xml:"member"`
Ref string `xml:"ref,attr"`
Type string `xml:"type,attr"`
Role string `xml:"role,attr"`
}
type Tag struct {
XMLName xml.Name `xml:"tag"`
Key string `xml:"k,attr"`
Value string `xml:"v,attr"`
}
func main() {
reader, err := os.Open("test.xml")
if err != nil {
log.Fatal(err)
}
defer reader.Close()
decoder := xml.NewDecoder(reader)
osm := &Osm{}
err = decoder.Decode(&osm)
if err != nil {
log.Fatal(err)
}
fmt.Println(osm)
}
Osm
and the other structs are analogous to the XML schema you expect. decoder.Decode(&osm)
applies that schema.
If you just want to extract part of the XML, see the answers to How to extract part of an XML file as a string?.
The rest of the answer will cover just the use of channels and goroutines. The XML part will be dropped.
If you add a few debugging statements you find that parseRelation
is never called which means the channel
is empty and fmt.Println(<- channel)
sits around waiting for an empty channel which is never closed. So once you're done processing, close the channel.
for {
p2Rc, err := reader.ReadSlice('\n')
...
}
close(channel)
Now we get { [] []}
5973974 times.
for i := 0; i < 5973974; i++ {
fmt.Println(<- channel)
}
That's trying to read from the channel 5973974 times. That defeats the point of channels. Instead, read from the channel using range
.
for thing := range channel {
fmt.Println(thing)
}
Now at least it finishes!
But there's a new problem. If it actually finds a thing, like if you change if p2Rc[2] == 114 {
to if p2Rc[2] == 32 {
, you'll get a panic: send on closed channel
. This is because parseRelation
is running in parallel to the reader and might try to write after the main reading code is finished and has closed the channel. You have to make sure everyone using the channel is done before closing it.
To fix this requires a fairly major redesign.
Here's an example of a simple program that reads lines from a file, puts them into a channel, and has a worker read from that channel.
func main() {
reader, err := os.Open("test.xml")
if err != nil {
log.Fatal(err)
}
defer reader.Close()
// Use the simpler bufio.Scanner
scanner := bufio.NewScanner(reader)
// A buffered channel for input
lines := make(chan string, 10)
// Work on the lines
go func() {
for line := range lines {
fmt.Println(line)
}
}()
// Read lines into the channel
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
// When main exits, channels gracefully close.
}
This works fine because main
is special and cleans up channels when it exits. But what if the reader and writer were both goroutines?
// A buffered channel for input
lines := make(chan string, 10)
// Work on the lines
go func() {
for line := range lines {
fmt.Println(line)
}
}()
// Read lines into the channel
go func() {
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
Empty. main
exits and shuts down the channel before the goroutines can do their job. We need a way to let main
know to wait until processing is finished. There's a couple ways to do this. One is with another channel to synchronize processing.
// A buffered channel for input
lines := make(chan string, 10)
// A channel for main to wait for
done := make(chan bool, 1)
// Work on the lines
go func() {
for line := range lines {
fmt.Println(line)
}
// Indicate the worker is done
done <- true
}()
// Read lines into the channel
go func() {
// Explicitly close `lines` when we're done so the workers can finish
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
// main will wait until there's something to read from `done`
<-done
Now main
will fire off the reader and worker goroutines and buffer waiting for something on done
. The reader will fill lines
until its done reading and then close it. In parallel the worker will read from lines
and write to done
once its done reading.
The other option is to use sync.WaitGroup.
// A buffered channel for input
lines := make(chan string, 10)
var wg sync.WaitGroup
// Note that there is one more thing to wait for
wg.Add(1)
go func() {
// Tell the WaitGroup we're done
defer wg.Done()
for line := range lines {
fmt.Println(line)
}
}()
// Read lines into the channel
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Text()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
// Wait until everything in the WaitGroup is done
wg.Wait()
As before, main
launches the reader and worker goroutines, but now it adds 1 to a WaitGroup just before launching the worker. Then it waits until wg.Wait()
returns. The reader works the same as before, closing the lines
channel when its finished. The worker now calls wg.Done()
when its finished decrementing the WaitGroup's count and allowing wg.Wait()
to return.
Each technique has advantages and disadvantages. done
is more flexible, it chains better, and can be safer if you can wrap your head around it. WaitGroups are simpler and easier to wrap your head around, but require that every goroutine share a variable.
If we wanted to add to this chain of processing, we can do so. Say we have a goroutine that reads the lines, one that processes them in XML elements, and one that does something with the elements.
// A buffered channel for input
lines := make(chan []byte, 10)
elements := make(chan *RS)
var wg sync.WaitGroup
// Worker goroutine, turn lines into RS structs
wg.Add(1)
go func() {
defer wg.Done()
defer close(elements)
for line := range lines {
if line[2] == 32 {
fmt.Println("Begin")
fmt.Println(string(line))
fmt.Println("End")
rs := &RS{}
xml.Unmarshal(line, &rs)
elements <- rs
}
}
}()
// File reader
go func() {
defer close(lines)
for scanner.Scan() {
lines <- scanner.Bytes()
}
if err := scanner.Err(); err != nil {
log.Fatal(err)
}
}()
// Element reader
wg.Add(1)
go func() {
defer wg.Done()
for element := range elements {
fmt.Println(element)
}
}()
wg.Wait()
This produces empty structs because you're trying to shove individual lines of XML into a struct representing the complete <relationship>
tag. But it demonstrates how you can add more workers to the chain.