Search code examples
goconcurrencywebsocketsynchronizationgorilla

Syncing websocket loops with channels in Golang


I'm facing a dilemma here trying to keep certain websockets in sync for a given user. Here's the basic setup:

type msg struct {
    Key         string
    Value   string
}

type connStruct struct {
    //...

    ConnRoutineChans []*chan string
    LoggedIn        bool
    Login       string

    //...

    Sockets         []*websocket.Conn
}

var (
    //...

    /*  LIST OF CONNECTED USERS AN THEIR IP ADDRESSES  */

        guestMap sync.Map

)

func main() {
    post("Started...")
    rand.Seed(time.Now().UTC().UnixNano())
    http.HandleFunc("/wss", wsHandler)
    panic(http.ListenAndServeTLS("...", "...", "...", nil))
}

func wsHandler(w http.ResponseWriter, r *http.Request) {
    if r.Header.Get("Origin")+":8080" != "https://...:8080" {
        http.Error(w, "Origin not allowed", 403)
        fmt.Println("Client origin not allowed! (https://"+r.Host+")")
        fmt.Println("r.Header Origin: "+r.Header.Get("Origin"))
        return
    }
    ///
    conn, err := websocket.Upgrade(w, r, w.Header(), 1024, 1024)
    if err != nil {
        http.Error(w, "Could not open websocket connection", http.StatusBadRequest)
        fmt.Println("Could not open websocket connection with client!")
    }

    //ADD CONNECTION TO guestMap IF CONNECTION IS nil
    var authString string = /*gets device identity*/;
    var authChan chan string = make(chan string);
    authValue, authOK := guestMap.Load(authString);
    if !authOK {
        // NO SESSION, CREATE A NEW ONE
        newSession = getSession();
        //defer newSession.Close();
        guestMap.Store(authString, connStruct{ LoggedIn: false,
                                ConnRoutineChans: []*chan string{&authChan},
                                         Login: "",
                                        Sockets: []*websocket.Conn{conn}
                                        /* .... */ });
    }else{
        //SESSION STARTED, ADD NEW SOCKET TO Sockets
        var tempConn connStruct = authValue.(connStruct);
        tempConn.Sockets = append(tempConn.Sockets, conn);
        tempConn.ConnRoutineChans = append(tempConn.ConnRoutineChans, &authChan)
        guestMap.Store(authString, tempConn);
    }

    //
    go echo(conn, authString, &authChan);
}

func echo(conn *websocket.Conn, authString string, authChan *chan string) {

    var message msg;

    //TEST CHANNEL
    authValue, _ := guestMap.Load(authString);
    go sendToChans(authValue.(connStruct).ConnRoutineChans, "sup dude?")

    fmt.Println("got past send...");

    for true {
        select {
            case val := <-*authChan:
                // use value of channel
                fmt.Println("AuthChan for user #"+strconv.Itoa(myConnNumb)+" spat out: ", val)
            default:
                // if channels are empty, this is executed
        }

        readError := conn.ReadJSON(&message)
        fmt.Println("got past readJson...");

        if readError != nil || message.Key == "" {
            //DISCONNECT USER
            //.....
            return
        }

        //
        _key, _value := chief(message.Key, message.Value, &*conn, browserAndOS, authString)

        if writeError := conn.WriteJSON(_key + "|" + _value); writeError != nil {
            //...
            return
        }

        fmt.Println("got past writeJson...");
    }
}

func sendToChans(chans []*chan string, message string){
    for i := 0; i < len(chans); i++ {
        *chans[i] <- message
    }
}

I know, a big block of code eh? And I commented out most of it...

Anyway, if you've ever used a websocket most of it should be quite familiar:

1) func wsHandler() fires every time a user connects. It makes an entry in guestMap (for each unique device that connects) which holds a connStruct which holds a list of channels: ConnRoutineChans []*chan string. This all gets passed to:

2) echo(), which is a goroutine that constantly runs for each websocket connection. Here I'm just testing out sending a message to other running goroutines, but it seems my for loop isn't actually constantly firing. It only fires when the websocket receives a message from the open tab/window it's connected to. (If anyone can clarify this mechanic, I'd love to know why it's not looping constantly?)

3) For each window or tab that the user has open on a given device there is a websocket and channel stored in an arrays. I want to be able to send a message to all the channels in the array (essentially the other goroutines for open tabs/windows on that device) and receive the message in the other goroutines to change some variables set in the constantly running goroutine.

What I have right now works only for the very first connection on a device, and (of course) it sends "sup dude?" to itself since it's the only channel in the array at the time. Then if you open a new tab (or even many), the message doesn't get sent to anyone at all! Strange?... Then when I close all the tabs (and my commented out logic removes the device item from guestMap) and start up a new device session, still only the first connection gets it's own message.

I already have a method for sending a message to all the other websockets on a device, but sending to a goroutine seems to be a little more tricky than I thought.


Solution

  • To answer my own question:

    First, I've switched from a sync.map to a normal map. Secondly, in order for nobody to be reading/writing to it at the same time I've made a channel that you call to do any read/write operation on the map. I've been trying my best to keep my data access and manipulation quick to execute so the channel doesn't get crowded so easily. Here's a small example of that:

    package main
    
    import (
        "fmt"
    )
    
    var (
      guestMap map[string]*guestStruct = make(map[string]*guestStruct);
      guestMapActionChan = make (chan actionStruct);
    
    )
    
    type actionStruct struct {
        Action      func([]interface{})[]interface{}
        Params      []interface{}
        ReturnChan  chan []interface{}
    }
    
    type guestStruct struct {
        Name string
        Numb int
    }
    
    func main(){
        //make chan listener
        go guestMapActionChanListener(guestMapActionChan)
    
        //some guest logs in...
        newGuest := guestStruct{Name: "Larry Josher", Numb: 1337}
    
        //add to the map
        addRetChan := make(chan []interface{})
        guestMapActionChan <- actionStruct{Action: guestMapAdd,
                                           Params: []interface{}{&newGuest},
                                           ReturnChan: addRetChan}
        addReturned := <-addRetChan
    
        fmt.Println(addReturned)
        fmt.Println("Also, numb was changed by listener to:", newGuest.Numb)
    
        // Same kind of thing for removing, except (of course) there's
        // a lot more logic to a real-life application.
    }
    
    func guestMapActionChanListener (c chan actionStruct){
        for{
            value := <-c;
            //
            returned := value.Action(value.Params);
            value.ReturnChan <- returned;
            close(value.ReturnChan)
        }
    }
    
    func guestMapAdd(params []interface{}) []interface{} {
        //.. do some parameter verification checks
        theStruct := params[0].(*guestStruct)
        name := theStruct.Name
        theStruct.Numb = 75
        guestMap[name] = &*theStruct
    
        return []interface{}{"Added '"+name+"' to the guestMap"}
    }
    

    For communication between connections, I just have each socket loop hold onto their guestStruct, and have more guestMapActionChan functions that take care of distributing data to other guests' guestStructs

    Now, I'm not going to mark this as the correct answer unless I get some better suggestions as how to do something like this the right way. But for now this is working and should guarantee no races for reading/writing to the map.

    Edit: The correct approach should really have been to just use a sync.Mutex like I do in the (mostly) finished project GopherGameServer