Search code examples
socketsgoscaling

Golang - Scaling a websocket client for multiple connections to different servers


I have a websocket client. In reality, it is far more complex than the basic code shown below. I now need to scale this client code to open connections to multiple servers. Ultimately, the tasks that need to be performed when a message is received from the servers is identical. What would be the best approach to handle this? As I said above the actual code performed when receiving the message is far more complex than shown in the example.

package main

import (
        "flag"
        "log"
        "net/url"
        "os"
        "os/signal"
        "time"

        "github.com/gorilla/websocket"
)

var addr = flag.String("addr", "localhost:1234", "http service address")

func main() {
        flag.Parse()
        log.SetFlags(0)

        interrupt := make(chan os.Signal, 1)
        signal.Notify(interrupt, os.Interrupt)

        // u := url.URL{Scheme: "ws", Host: *addr, Path: "/echo"}
        u := url.URL{Scheme: "ws", Host: *addr, Path: "/"}
        log.Printf("connecting to %s", u.String())

        c, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
        if err != nil {
                log.Fatal("dial:", err)
        }
        defer c.Close()

        done := make(chan struct{})

        go func() {
                defer close(done)
                for {
                        _, message, err := c.ReadMessage()
                        if err != nil {
                                log.Println("read:", err)
                                return
                        }
                        log.Printf("recv: %s", message)
                }
        }()

        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()

        for {
                select {
                case <-done:
                        return
                case t := <-ticker.C:
                        err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                        if err != nil {
                                log.Println("write:", err)
                                return
                        }
                case <-interrupt:
                        log.Println("interrupt")

                        // Cleanly close the connection by sending a close message and then
                        // waiting (with timeout) for the server to close the connection.
                        err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                        if err != nil {
                                log.Println("write close:", err)
                                return
                        }
                        select {
                        case <-done:
                        case <-time.After(time.Second):
                        }
                        return
                }
        }
}

Solution

  • Modify the interrupt handling to close a channel on interrupt. This allows multiple goroutines to wait on the event by waiting for the channel to close.

    shutdown := make(chan struct{})
    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)
    go func() {
        <-interrupt
        log.Println("interrupt")
        close(shutdown)
    }()
    

    Move the per-connection code to a function. This code is a copy and paste from the question with two changes: the interrupt channel is replaced with the shutdown channel; the function notifies a sync.WaitGroup when the function is done.

    func connect(u string, shutdown chan struct{}, wg *sync.WaitGroup) {
        defer wg.Done()
    
        log.Printf("connecting to %s", u)
        c, _, err := websocket.DefaultDialer.Dial(u, nil)
        if err != nil {
            log.Fatal("dial:", err)
        }
        defer c.Close()
    
        done := make(chan struct{})
    
        go func() {
            defer close(done)
            for {
                _, message, err := c.ReadMessage()
                if err != nil {
                    log.Println("read:", err)
                    return
                }
                log.Printf("recv: %s", message)
            }
        }()
    
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
    
        for {
            select {
            case <-done:
                return
            case t := <-ticker.C:
                err := c.WriteMessage(websocket.TextMessage, []byte(t.String()))
                if err != nil {
                    log.Println("write:", err)
                    return
                }
            case <-shutdown:
                // Cleanly close the connection by sending a close message and then
                // waiting (with timeout) for the server to close the connection.
                err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
                if err != nil {
                    log.Println("write close:", err)
                    return
                }
                select {
                case <-done:
                case <-time.After(time.Second):
                }
                return
            }
        }
    }
    

    Declare a sync.WaitGroup in main(). For each websocket endpoint that you want to connect to, increment the WaitGroup and start a goroutine to connect that endpoint. After starting the goroutines, wait on the WaitGroup for the goroutines to complete.

    var wg sync.WaitGroup
    for _, u := range endpoints { // endpoints is []string 
                                  // where elements are URLs 
                                  // of endpoints to connect to.
        wg.Add(1)
        go connect(u, shutdown, &wg)
    }
    wg.Wait()
    

    The code above with an edit to make it run against Gorilla's echo example server is posted on the playground.