I am working on a REST API server and one of the features of the server is being able to notify arbitrary number of client via a websocket when a new resource is created or an existing one is modified.
I have a custom action router to bind an URL to a function and gorillas's implementation of websocket library. For IPC I have decided to rely on channels as it appears to be the idiomatic way to communicate between coroutines. Also it behaves like a pipe which is a concept I am familiar with.
A prototype for a function Create
looks like this:
func Create (res http.ResponseWriter, req *http.Request, userdata interface {}) (int, string, interface {})
As a userdata
an instance of a structure PipeSet
is passed. It is a map that is shared between all coroutines where a key is an address (a pointer to) of a Pipe
and value the same thing. The rationale here is to speed up a lookup process when deleting.
type Pipe chan string
type PipeSet struct {
sync.Mutex
Pipes map [*Pipe] *Pipe
}
func NewPipe () Pipe {
return make (Pipe)
}
func NewPipeSet () PipeSet {
var newSet PipeSet
newSet.Pipes = make (map[*Pipe] *Pipe)
return newSet
}
func (o *PipeSet) AddPipe (pipe *Pipe) {
o.Lock ()
o.Pipes[pipe] = pipe
o.Unlock ()
}
func (o *PipeSet) ForeachPipe (f func (pipe Pipe)) {
o.Lock ()
for k := range (o.Pipes) {
f (*o.Pipes[k])
}
o.Unlock ()
}
func (o *PipeSet) DeletePipe (pipe *Pipe) {
o.Lock ()
delete (o.Pipes, pipe)
o.Unlock ()
}
When a client connects via websocket a new channel (a Pipe
) is created and added to a shared PipeSet
. Then if a new resource is created a coroutine goes through an entire PipeSet
sending a message to each Pipe
. The message is then forwarded to a connected clients on the other side.
I am unable to detect whether client's websocket connection is still there. I need to know that to determine whether I should remove a Pipe
from the PipeSet
. I am relying on CloseNotifier
in this case. It never fires.
The code looks like this (excerpt):
var upgrader = websocket.Upgrader {
CheckOrigin: func (r *http.Request) bool { return true },
}
conn, err := upgrader.Upgrade (res, req, nil)
if err != nil {
marker.MarkError (err)
return http.StatusBadRequest, "", nil
}
defer conn.Close ()
exitStatus = http.StatusOK
pipe := genstore.NewPipe ()
quit := res.(http.CloseNotifier).CloseNotify ()
genStore.WSChannels.AddPipe (&pipe)
for {
log.Printf ("waiting for a message")
select {
case wsMsg = <-pipe:
log.Printf ("got a message: %s (num pipes %d)", wsMsg, len (genStore.WSChannels.Pipes))
if err = conn.WriteMessage (websocket.TextMessage, []byte (wsMsg)); err != nil {
marker.MarkError (err)
goto egress
}
case <-quit:
log.Printf ("quit...")
goto egress
}
}
egress:
genStore.WSChannels.DeletePipe (&pipe)
When you upgrade HTTP connection to a WebSocket connection using Gorilla, it hijacks that connection and net/http server stops serving it. This means, that you can't rely on a net/http events from that moment.
Check this: https://github.com/gorilla/websocket/issues/123
So, what you can do here is to start new goroutine for every new WebSocket connection, which will read data from this connection and write a message to a quit
channel on a failure.