Search code examples
gogoroutine

Registering an http URL handler in a slice of handlers


For a goal to broadcast message from a goroutine to multiple http URL handlers, I am trying to register these http URL handlers, with below code in main.go:

type webSocketHandler func(http.ResponseWriter, *http.Request)

type threadSafeSlice struct {
    sync.Mutex
    handlers []*webSocketHandler
}

var sliceOfHandlers threadSafeSlice

func (slice *threadSafeSlice) push(handle *webSocketHandler) { //register

    slice.Lock()
    defer slice.Unlock()

    slice.handlers = append(slice.handlers, handle)
}

where forWardMsgToClient() is the http URL handler that need to be registered,

broadCastMessage() goroutine can broadcast message to multiple forWardMsgToClient() handlers, in the below code:

func main() {

    go broadcastMessage()
    http.HandleFunc("/websocket", forwardMsgToClient)
    http.ListenAndServe(":3000", nil)

}

func forwardMsgToClient(w http.ResponseWriter, r *http.Request) {
    conn, err := upgrader.Upgrade(w, r, nil)
    for {
         // Forward message to the client upon receiving a msg from publisher
    }
}

All the above code is in main.go

But the problem is, goroutine forwardMsgToClient() gets spawned for respective client after rw, e := l.Accept() call in ../go/src/net/http/server.go.

Reason to register(push()) http URL handler function(forwardMsgToClient()) is to make broadcastMessage() goroutine know the number of channels to create for all http URL handlers and delete the channel when un-registering http URL handler function(forwardMsgToClient()).


Bit nervous, if we need to modify /go/src/net/http/server.go to achieve this goal


How to register(push()) a http URL handler function forwardMsgToClient() in sliceOfHandlers.handlers?


Solution

  • To broadcast a message to all connected websocket clients, do the following:

    • Add the connection to a collection on upgrade.
    • Remove the connection from a collection when the connection is closed.
    • Broadcast by iterating through the collection.

    A simple approach is:

    type Clients struct {
        sync.Mutex
        m map[*websocket.Conn]struct{}
    }
    
    var clients = Clients{m: map[*websocket.Conn]struct{}{}}
    
    func (cs *Clients) add(c *websocket.Conn) {
        cs.Lock()
        cs.m[c] = struct{}{}
        cs.Unlock()
    }
    
    func (cs *Clients) remove(c *websocket.Conn) {
        cs.Lock()
        delete(cs.m, c)
        cs.Unlock()
    }
    
    func (cs *Clients) broadcast(message []byte) {
        cs.Lock()
        defer cs.Unlock()
        for c, _ := range m {
           c.WriteMessage(websocket.TextMessage, message)
        }
    }
    

    The handler adds and removes connections from the collection as follows:

    func forwardMsgToClient(w http.ResponseWriter, r *http.Request) {
        conn, err := upgrader.Upgrade(w, r, nil)
        if err != nil {
            // handle error
        }
        defer c.Close()
        clients.add(c)
        defer clients.remove(c)
    
        // Read the connection as required by the package.
        for {
            if _, _, err := c.NextReader(); err != nil {
                break
            }
        }
    }
    

    To send a message to all connected clients, call clients.broadcast(message).

    This simple approach is not production ready for a couple of reasons: it does not the handle the error returned from WriteMessage, broadcast can block on a stuck client.

    For a more robust solution, see Gorilla chat example hub. The hub interposes a channel between the broadcaster and the connection, thus allowing the hub to broadcast without blocking. The go broadcastMessage() in the question corresponds to go hub.run() in the Gorilla example. The forwardMsgToClient handler in the question will create a *client and sent it to hub register channel on upgrade and send that *client to the hub unregister channel on disconnect. The *client has a channel that's pumped to the connection.