For a goal to broadcast message from a goroutine to multiple http URL handlers, I am trying to register these http URL handlers, with below code in main.go
:
type webSocketHandler func(http.ResponseWriter, *http.Request)
type threadSafeSlice struct {
sync.Mutex
handlers []*webSocketHandler
}
var sliceOfHandlers threadSafeSlice
func (slice *threadSafeSlice) push(handle *webSocketHandler) { //register
slice.Lock()
defer slice.Unlock()
slice.handlers = append(slice.handlers, handle)
}
where forWardMsgToClient()
is the http URL handler that need to be registered,
broadCastMessage()
goroutine can broadcast message to multiple forWardMsgToClient()
handlers, in the below code:
func main() {
go broadcastMessage()
http.HandleFunc("/websocket", forwardMsgToClient)
http.ListenAndServe(":3000", nil)
}
func forwardMsgToClient(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
for {
// Forward message to the client upon receiving a msg from publisher
}
}
All the above code is in main.go
But the problem is, goroutine forwardMsgToClient()
gets spawned for respective client after rw, e := l.Accept()
call in ../go/src/net/http/server.go
.
Reason to register(push()
) http URL handler function(forwardMsgToClient()
) is to make broadcastMessage()
goroutine know the number of channels to create for all http URL handlers and delete the channel when un-registering http URL handler function(forwardMsgToClient()
).
Bit nervous, if we need to modify /go/src/net/http/server.go
to achieve this goal
How to register(push()
) a http URL handler function forwardMsgToClient()
in sliceOfHandlers.handlers
?
To broadcast a message to all connected websocket clients, do the following:
A simple approach is:
type Clients struct {
sync.Mutex
m map[*websocket.Conn]struct{}
}
var clients = Clients{m: map[*websocket.Conn]struct{}{}}
func (cs *Clients) add(c *websocket.Conn) {
cs.Lock()
cs.m[c] = struct{}{}
cs.Unlock()
}
func (cs *Clients) remove(c *websocket.Conn) {
cs.Lock()
delete(cs.m, c)
cs.Unlock()
}
func (cs *Clients) broadcast(message []byte) {
cs.Lock()
defer cs.Unlock()
for c, _ := range m {
c.WriteMessage(websocket.TextMessage, message)
}
}
The handler adds and removes connections from the collection as follows:
func forwardMsgToClient(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
// handle error
}
defer c.Close()
clients.add(c)
defer clients.remove(c)
// Read the connection as required by the package.
for {
if _, _, err := c.NextReader(); err != nil {
break
}
}
}
To send a message to all connected clients, call clients.broadcast(message)
.
This simple approach is not production ready for a couple of reasons: it does not the handle the error returned from WriteMessage, broadcast can block on a stuck client.
For a more robust solution, see Gorilla chat example hub. The hub interposes a channel between the broadcaster and the connection, thus allowing the hub to broadcast without blocking. The go broadcastMessage()
in the question corresponds to go hub.run()
in the Gorilla example. The forwardMsgToClient
handler in the question will create a *client
and sent it to hub register
channel on upgrade and send that *client
to the hub unregister
channel on disconnect. The *client
has a channel that's pumped to the connection.