Search code examples
reactjswebsocketrelayjs

Relay Modern: Connecting websocket to network layer


I’m having issues figuring out how to connect the Relay Modern network layer with my websocket instance.

I’m currently instantiating a websocket instance as:

const subscriptionWebSocket = new ReconnectingWebSocket('ws://url.url/ws/subscriptions/', null, options);

I'm specifying the subscription and creating a new instance of requestSubscription:

const subscription = graphql`
  subscription mainSubscription {
    testData {
      anotherNode {
        data
      }
    }
  }
`;

requestSubscription(
  environment,
  {
    subscription,
    variables: {},
    onComplete: () => {...},
    onError: (error) => {...},
    onNext: (response) => {...},
    updater: (updaterStoreConfig) => {...},
  },
);

Which then allows me to send any subscription requests:

function subscriptionHandler(subscriptionConfig, variables, cacheConfig, observer) {
  subscriptionWebSocket.send(JSON.stringify(subscriptionConfig.text));

  return {
    dispose: () => {
      console.log('subscriptionHandler: Disposing subscription');
    },
  };
}

const network = Network.create(fetchQuery, subscriptionHandler);

through to my server (currently using Graphene-python), and I’m able to interpret the received message on the server.

However, what I’m having issues figuring out is how to respond to a subscription; for example, when something changes in my DB, I want to generate a response and return to any potential subscribers.

The question being, how do I connect the onMessage event from my websocket instance into my Relay Modern Network Layer? I've browsed through the source for relay but can't seem to figure out what callback, or what method should be implementing an onreceive.

Any tips are appreciated.


Solution

  • I’ll just write down how I’ve approached this issue after the assistance found in this thread. It might be usable for someone else. This is very dependent on the server-side solution that you've chosen.

    My approach:

    Firstly I built a SubscriptionHandler that will handle the requestStream#subscribeFunction through SubscriptionHandler#setupSubscription.

    The SubscriptionHandler instantiates a WebSocket (using a custom version of ReconnectingWebSockets) and attaches the onmessage event to an internal method (SubscriptionHandler#receiveSubscriptionPayload) which will add the payload to the corresponding request.

    We create new subscriptions through SubscriptionHandler#newSubscription which will use the internal attribute SubscriptionHandler.subscriptions to add a keyed entry of this subscription (we use an MD5-hash util over the query and variables); meaning the object will come out as:

    SubscriptionHandler.subscriptions = {
      [md5hash]: {
        query: QueryObject,
        variables: SubscriptionVariables,
        observer: Observer (contains OnNext method)
    }
    

    Whenever the server sends a subscription response the SubscriptionHandler#receiveSubscriptionPayload method will be called and it will identify what subscription the payload belongs to by using the query/variables md5 hash, then use the SubscriptionHandler.subscriptions observer onNext method.

    This approach requires the server to return a message such as:

    export type ServerResponseMessageParsed = {
      payload: QueryPayload,
      request: {
        query: string,
        variables: Object,
      }
    }
    

    I do not know if this is a great way of handling subscriptions, but it works for now with my current setup.

    SubscriptionHandler.js

    class SubscriptionHandler {
      subscriptions: Object;
      subscriptionEnvironment: RelayModernEnvironment;
      websocket: Object;
    
      /**
       * The SubscriptionHandler constructor. Will setup a new websocket and bind
       * it to internal method to handle receving messages from the ws server.
       *
       * @param  {string} websocketUrl      - The WebSocket URL to listen to.
       * @param  {Object} webSocketSettings - The options object.
       *                                      See ReconnectingWebSocket.
       */
      constructor(websocketUrl: string, webSocketSettings: WebSocketSettings) {
        // All subscription hashes and objects will be stored in the
        // this.subscriptions attribute on the subscription handler.
        this.subscriptions = {};
    
        // Store the given environment internally to be reused when registering new
        // subscriptions. This is required as per the requestRelaySubscription spec
        // (method requestSubscription).
        this.subscriptionEnvironment = null;
    
        // Create a new WebSocket instance to be able to receive messages on the
        // given URL. Always opt for default protocol for the RWS, second arg.
        this.websocket = new ReconnectingWebSocket(
          websocketUrl,
          null,  // Protocol.
          webSocketSettings,
        );
    
        // Bind an internal method to handle incoming messages from the websocket.
        this.websocket.onmessage = this.receiveSubscriptionPayload;
      }
    
      /**
       * Method to attach the Relay Environment to the subscription handler.
       * This is required as the Network needs to be instantiated with the
       * SubscriptionHandler's methods, and the Environment needs the Network Layer.
       *
       * @param  {Object} environment - The apps environment.
       */
      attachEnvironment = (environment: RelayModernEnvironment) => {
        this.subscriptionEnvironment = environment;
      }
    
      /**
       * Generates a hash from a given query and variable pair. The method
       * used is a recreatable MD5 hash, which is used as a 'key' for the given
       * subscription. Using the MD5 hash we can identify what subscription is valid
       * based on the query/variable given from the server.
       *
       * @param  {string} query     - A string representation of the subscription.
       * @param  {Object} variables - An object containing all variables used
       *                              in the query.
       * @return {string}             The MD5 hash of the query and variables.
       */
      getHash = (query: string, variables: HashVariables) => {
        const queryString = query.replace(/\s+/gm, '');
        const variablesString = JSON.stringify(variables);
        const hash = md5(queryString + variablesString).toString();
        return hash;
      }
    
      /**
       * Method to be bound to the class websocket instance. The method will be
       * called each time the WebSocket receives a message on the subscribed URL
       * (see this.websocket options).
       *
       * @param  {string} message - The message received from the websocket.
       */
      receiveSubscriptionPayload = (message: ServerResponseMessage) => {
        const response: ServerResponseMessageParsed = JSON.parse(message.data);
        const { query, variables } = response.request;
        const hash = this.getHash(query, variables);
    
        // Fetch the subscription instance from the subscription handlers stored
        // subscriptions.
        const subscription = this.subscriptions[hash];
    
        if (subscription) {
          // Execute the onNext method with the received payload after validating
          // that the received hash is currently stored. If a diff occurs, meaning
          // no hash is stored for the received response, ignore the execution.
          subscription.observer.onNext(response.payload);
        } else {
          console.warn(Received payload for unregistered hash: ${hash});
        }
      }
    
      /**
       * Method to generate new subscriptions that will be bound to the
       * SubscriptionHandler's environment and will be stored internally in the
       * instantiated handler object.
       *
       * @param {string} subscriptionQuery - The query to subscribe to. Needs to
       *                                     be a validated subscription type.
       * @param {Object} variables         - The variables for the passed query.
       * @param {Object} configs           - A subscription configuration. If
       *                                     override is required.
       */
      newSubscription = (
          subscriptionQuery: GraphQLTaggedNode,
          variables: Variables,
          configs: GraphQLSubscriptionConfig,
      ) => {
        const config = configs || DEFAULT_CONFIG;
    
        requestSubscription(
          this.subscriptionEnvironment,
          {
            subscription: subscriptionQuery,
            variables: {},
            ...config,
          },
        );
      }
    
      setupSubscription = (
        config: ConcreteBatch,
        variables: Variables,
        cacheConfig: ?CacheConfig,
        observer: Observer,
      ) => {
        const query = config.text;
    
        // Get the hash from the given subscriptionQuery and variables. Used to
        // identify this specific subscription.
        const hash = this.getHash(query, variables);
    
        // Store the newly created subscription request internally to be re-used
        // upon message receival or local data updates.
        this.subscriptions[hash] = { query, variables };
    
        const subscription = this.subscriptions[hash];
        subscription.observer = observer;
    
        // Temp fix to avoid WS Connection state.
        setTimeout(() => {
          this.websocket.send(JSON.stringify({ query, variables }));
        }, 100);
      }
    }
    
    const subscriptionHandler = new SubscriptionHandler(WS_URL, WS_OPTIONS);
    
    export default subscriptionHandler;