Search code examples
gowebsocketserver-sent-eventsgo-gin

How to send message to all connections in pool


When a server side event is added to a stream, how do I make sure all clients currently connected to that stream receive the event, i.e. how do I loop through all clients and send them the message before discarding the message , is this even possible with sse and Go?

summarised pseudo code of what I want to achieve below

package main

import (
    "github.com/gin-contrib/sse"
    "github.com/gin-gonic/gin"
    "net/http"
)

func main() {

    router := gin.New()
    router.Use(gin.Logger())

    var events = make(chan sse.Event, 100)

    router.GET("/api/addUser/event", func(c *gin.Context) {

        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")

        // if events chan has an event
        // Send event to all connected clients

        if( we have events then send them to all clients){

            event := <-events
            _ = sse.Encode(c.Writer, event)
        }

    })

    router.POST("/api/addUser", func(c *gin.Context) {

        //On user add
        //Add event to events chan
        events <- sse.Event{
            Event: "newChiitiko",
            Id:    "1",
            Data:  "New Chiitiko Event",
        }

        c.JSON(http.StatusOK, "okay")
    })

    _ = router.Run(":5000")
}

Solution

  • It's hard to do it with single channel. The simplest answer is create channel for each connection.

    Like:

    mu := new(sync.Mutex)
    var eventChans []sse.Event
    
    router.GET("/api/addUser/event", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")
    
        // Add own channel to the pool.
        events := make(chan sse.Event)
        mu.Lock()
        eventChans = append(eventChans, events)
        mu.Unlock()
    
        // Listen for the events.
        for(event := range events) {
            sse.Encode(c.Writer, event)
        }
    })
    
    router.POST("/api/addUser", func(c *gin.Context) {
        mu.Lock()
        for(_, events := range eventChans) {
            events <- sse.Event{ ... }
        }
        mu.Unlock()
    
        c.JSON(http.StatusOK, "okay")
    })
    

    Or use sync.Cond.

    cond := sync.NewCond(new(sync.Mutex))
    var event *sse.Event
    
    router.GET("/api/addUser/event", func(c *gin.Context) {
        c.Header("Content-Type", "text/event-stream")
        c.Header("Access-Control-Allow-Origin", "*")
        c.Header("Access-Control-Allow-Headers", "access-control-allow-origin, access-control-allow-headers")
    
        for {
            // Wait for event.
            cond.L.Lock()
            for(event == nil) {
                cond.Wait()
            }
    
            sse.Encode(c.Writer, event)
        }
    })
    
    router.POST("/api/addUser", func(c *gin.Context) {
        cond.L.Lock()
        event = sse.Event{ ... }
        cond.L.Unlock()
    
        cond.Broadcast()
    
        c.JSON(http.StatusOK, "okay")
    })