Search code examples
ruby-on-railsrubywebsocketruby-on-rails-5

How to provide Rails websocket to an existing client (any generic message)?


After trying different things in an attempt to get this working, I haven't managed to yet wrap my head around Rails' magical values in order to provide a websocket interface. Client code is known to work, as it has been already tested with NodeJS and FastAPI interfaces. To my understanding the correct way for the Rails server to read/respond to WS events is through the implemented 'send_msg' method in the last code snippet, but how should the method be called? It seems that in order to call the send_msg method I would have to modify the client code to use a JS library (as in here) provided by Rails, which is not possible.

As the title says, the question would be how to create a simple (but generic?) WS message receiver/broadcaster?

How the websocket interface should work

  • Have a ws: endpoint at /clock
  • Client can connect to /clock
  • When a client sends a WS message with the data 'requestTime' to /clock, the API broadcasts server system time to all connected clients
  • Client code cannot be altered

How client attempts to connect and request time (NodeJS) (connect X clients and broadcast time Y times)

import async from "async";
import fetch from "node-fetch";
import fs from "fs";
import ws from "ws";

// client that only counts the received messages
function wsFunction(requestCount) {
    return resolve => {
        let limit = requestCount;
        // construct all promises
        const client = new ws("ws://localhost:3000/clock");

        client.on('message', data => {
            if(--limit == 0) {
                client.close();
            }
        });

        client.on('close', () => {
            if(limit > 0) {
                console.log("Socket closed prematurely... ", limit);
            }
        });

        client.on('open', () => {
            resolve(client); // client connected
        });

        const close = () => {
            if(client.readyState !== ws.CLOSED && client.readyState !== ws.CLOSING) {
                client.close();
            }
        }
    }
}

/**
 * 
 * @param {*} limit 
 * @returns operation time for limit messages, or -1 if connection is cut
 */
function attemptClockFetches(clientCount, retrieveCount) {
    const clients = [];
    for(let i = 0; i < clientCount - 1; ++i) {
        clients.push(async () => new Promise(wsFunction(retrieveCount)));
    }

    // client that initiates the broadcast
    const promise = new Promise(async resolve => {
        const startTime = performance.now();
        const sockets = await async.parallel(clients); // connect all clients

        // create updater client
        const client = new ws("ws://localhost:3000/clock");

        // now update until limit is reached
        client.on('close', () => {
            if(retrieveCount > 0) {
                console.log("Parent socket closed prematurely...");
            }
        });

        client.on('message', () => {
            if(--retrieveCount > 0) {
                client.send("requestTime");
            } else {
                client.close();
                const endTime = performance.now();
                // close all sockets
                for(let s of sockets) {
                    s.close();
                }
                resolve(endTime - startTime);
            }
        });

        client.on('open', () => {
            client.send("requestTime");
        });
    });

    return promise;
}

async function doStressTest() {
    await attemptClockFetches(10, 10);
}

const i = setInterval(() => {
    // prevent node from killing process
}, 1000);

doStressTest().then(() => {
    clearInterval(i);
});

A snippet of a working NodeJS WebSocket responder, essentially this is what needs to be replicated in Rails

const wsServer = new ws.WebSocketServer({ server: server, path: "/clock" });

wsServer.on('connection', socket => {
    socket.on('error', err => {
        console.error(err);
    });

    socket.on('message', data => {
        if(data.toString() === "requestTime") {
            // broadcast time on requestTime event to all clients
            wsServer.clients.forEach(client => {
                if(client.readyState === ws.OPEN) {
                    client.send((new Date()).getMilliseconds());
                }
            });
        }
    });
});

What I currently have implemented I've added this to routes.rb, assuming that it directs all WS events to path /clock which is ClocksChannel

Rails.application.routes.draw do
  get '/users/:userId/cards', to: 'card#index'
  # get '/clock', to: 'card#clock' <- ADDING THIS MAKES RAILS RESPOND IN HTTP EVEN THOUGH USING WebSocket PROTOCOL

  mount ActionCable.server => '/clock'
end

Contents of the main card_controller.rb

class CardController < ApplicationController
    def index
        # do some index things, not part of WS
    end

    # def clock
    #     render "Hello World"
    # end
end

Implemented this channel, assuming that it subscribes and unsubscribes the clients. As for calling send_msg, I don't have a clear understanding as to how it should be called

require "time"

class ClocksChannel < ApplicationCable::Channel
  def subscribed
    # stream_from "some_channel"
  end

  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end

  def send_msg(data)
    if data == "requestTime"
          ActionCable.server.broadcast "requestTime", message: (Time.now.to_f * 1000).to_i
    end
  end
end

When the server receives a connection with the given setup, the following output is given from within the Rails libraries:

Started GET "/clock" for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Started GET "/clock/" [WebSocket] for 127.0.0.1 at 2023-03-09 20:12:29 +0200
Successfully upgraded to WebSocket (REQUEST_METHOD: GET, HTTP_CONNECTION: Upgrade, HTTP_UPGRADE: websocket)
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
There was an exception - JSON::ParserError(859: unexpected token at 'requestTime')
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/3.1.0/json/common.rb:216:in `parse'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/json/decoding.rb:23:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:168:in `decode'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/connection/base.rb:89:in `dispatch_websocket_message'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/actioncable-7.0.4.2/lib/action_cable/server/worker.rb:59:in `block in invoke'
C:/Ruby31-x64/lib/ruby/gems/3.1.0/gems/activesupport-7.0.4.2/lib/active_support/callbacks.rb:118:in `block in run_callbacks'
...
...

Solution

  • It seems this is not doable with ApplicationCable, as it requires the data to be in JSON format. In order to fix this I did the following changes:

    Remove all previous code displayed in the question above, as it's no longer necessary/valid

    add gem 'faye-websocket' after which run bundle install

    Create the controller using Faye:

    class ClockController < ApplicationController
        @@clients = []
        def connect
            if Faye::WebSocket.websocket?(request.env)
                ws = Faye::WebSocket.new(request.env)
    
                ws.on :open do |event|
                    # Code to execute when the websocket connection is opened
                    @@clients << ws
                end
    
                ws.on :message do |event|
                    if event.data == 'requestTime'
                        now = Time.now
                        @@clients.each do |client|
                            client.send(now.to_s)
                        end
                    end
                end
    
                ws.on :close do |event|
                    # Code to execute when the websocket connection is closed
                    @@clients.delete(ws)
                    ws = nil
                end
    
                # Return async Rack response
                return ws.rack_response
            else
                # Return regular HTTP response
                render text: 'Not a websocket request'
            end
        end
    end
    

    Add the route to the controller in routes.rb

    get '/clock', to: 'clock#connect', via: :all
    

    This solution may not be perfect in all cases, but it gets the job done well enough