Search code examples
pythonapollogql

How do I communicate using gql with Apollo websocket protocol


I want to connect to the websocket. When I inspect the traffic between the client and the server I see that the first message is the handshake:

{"type": "connection_init",
 "payload":
           {
            "accept-language": "en",
            "ec-version": "5.1.88",
            "referrer": URL,
           }
}

Based on the format (the keys of the dict) I conclude that the websocket uses an Apollo websocket transport prototcol.

Next, I am following the websocket-example of gql's documentation.

import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport


logging.basicConfig(level=logging.INFO)
    
async def main():
    transport = WebsocketsTransport(url=URL,
                                    init_payload={'accept-language': 'en',
                                                  'ec-version': '5.1.88'})
    async with Client(transport=transport,
                      fetch_schema_from_transport=False) as session:
        # do something
    

asyncio.run(main())

After reading more about the protocol here I still don't understand how I can send messages to the server within my Python-script How do I send the the below message to the websocket?

{
 "id": "1073897396",
 "type": "start",
 "operationName": operation,
 "eventId": 488
}

Solution

  • The EuropeanTour website does not use the Apollo websocket protocol for its API so you cannot use the standard WebsocketTransport class.

    What I found out:

    • instead of returning a connection_ack message after the connection_init, it returns a wsIdentity message
    • for a subscription, the client will first send a start message containing an operationName and an eventId, without any GraphQL query
    • the server will then answer with a request-full-subscription message, requiring the client to send the full subscription query
    • the client has then to send a full-subscription message containing the full requested query, including the operationName and a new field subscriptionName
    • the id is not actually random, it is generated from hashing together the subscriptionName and the variables, from a json format with the dictionary keys sorted. It took me a while to figure it out as no error is generated if the id is not correct
    • the server seems to cache the GraphQL subscription based on the IP address probably so it will not request the subscription with a request-full-subscription message if it has already received it previously

    Here is a Chrome screenshot showing the request-full-subscription messages:

    Chrome websocket messages

    You can then create your own transport by inheriting the WebsocketTransport class and making the necessary modifications.

    Here is an example of working code:

    
    import asyncio
    import json
    import logging
    from typing import Any, AsyncGenerator, Dict, Optional, Tuple
    
    from graphql import DocumentNode, ExecutionResult, print_ast
    
    from gql import Client, gql
    from gql.transport.exceptions import TransportProtocolError
    from gql.transport.websockets import WebsocketsTransport
    
    logging.basicConfig(level=logging.INFO)
    
    
    class EuropeanTourWebsocketsTransport(WebsocketsTransport):
        def _hash(self, e):
            t = 5381
            r = len(e)
            while r:
                r -= 1
                t = t * 33 ^ ord(e[r])
            return t & 0xFFFFFFFF
    
        def _calc_id(self, subscription_name, variables):
    
            obj = {
                "subscriptionName": subscription_name,
                "variables": variables,
            }
    
            obj_stringified = json.dumps(
                obj,
                separators=(",", ":"),
                sort_keys=True,
            )
    
            hashed_value = self._hash(obj_stringified)
    
            return hashed_value
    
        async def _send_query(
            self,
            document: DocumentNode,
            variable_values: Optional[Dict[str, Any]] = None,
            operation_name: Optional[str] = None,
        ) -> int:
    
            # Calculate the id by hashing the subscription name and the variables
            query_id = self._calc_id(self.latest_subscription_name, variable_values)
    
            # Creating the payload for the full subscription
            payload: Dict[str, Any] = {"query": print_ast(document)}
            if variable_values:
                payload["variables"] = variable_values
            if operation_name:
                payload["operationName"] = operation_name
                payload["subscriptionName"] = self.latest_subscription_name
    
            # Saving the full query first and waiting for the server to request it later
            self.saved_full_subscriptions[str(query_id)] = payload
    
            # Then first start to request the subscription only with the operation name
            query_str = json.dumps(
                {
                    "id": str(query_id),
                    "type": "start",
                    "operationName": operation_name,
                    "eventId": self.latest_event_id,
                }
            )
    
            await self._send(query_str)
    
            return query_id
    
        async def subscribe(
            self,
            document: DocumentNode,
            *,
            variable_values: Optional[Dict[str, Any]] = None,
            operation_name: str,
            subscription_name: str,
            event_id: int,
            send_stop: Optional[bool] = True,
        ) -> AsyncGenerator[ExecutionResult, None]:
    
            self.latest_event_id = event_id
            self.latest_subscription_name = subscription_name
    
            async for result in super().subscribe(
                document,
                variable_values=variable_values,
                operation_name=operation_name,
                send_stop=send_stop,
            ):
                yield result
    
        async def _wait_ack(self) -> None:
    
            self.saved_full_subscriptions = {}
    
            while True:
                init_answer = await self._receive()
    
                answer_type, answer_id, execution_result = self._parse_answer(init_answer)
    
                if answer_type == "wsIdentity":
                    return
    
                raise TransportProtocolError(
                    "Websocket server did not return a wsIdentity response"
                )
    
        def _parse_answer(
            self, answer: str
        ) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
            try:
                json_answer = json.loads(answer)
            except ValueError:
                raise TransportProtocolError(
                    f"Server did not return a GraphQL result: {answer}"
                )
    
            if "wsIdentity" in json_answer:
                return ("wsIdentity", json_answer["wsIdentity"], None)
    
            elif (
                "type" in json_answer and json_answer["type"] == "request-full-subscription"
            ):
                return ("request-full-subscription", json_answer["id"], None)
    
            else:
    
                return self._parse_answer_apollo(json_answer)
    
        async def send_full_subscription(self, answer_id: str):
    
            if answer_id not in self.saved_full_subscriptions:
                raise Exception(f"Full subscription not found for id {answer_id}")
    
            payload = self.saved_full_subscriptions[answer_id]
    
            query_str = json.dumps(
                {"id": answer_id, "type": "full-subscription", "payload": payload}
            )
    
            await self._send(query_str)
    
        async def _handle_answer(
            self,
            answer_type: str,
            answer_id: Optional[int],
            execution_result: Optional[ExecutionResult],
        ) -> None:
    
            if answer_type == "request-full-subscription":
                await self.send_full_subscription(answer_id)
    
            else:
                await super()._handle_answer(answer_type, answer_id, execution_result)
    
    
    async def main():
    
        transport = EuropeanTourWebsocketsTransport(
            url="wss://btec-websocket.services.imgarena.com",
            init_payload={
                "accept-language": "en",
                "ec-version": "5.1.88",
                "operator": "europeantour",
                "referrer": "https://www.europeantour.com/",
                "sport": "GOLF",
            },
        )
    
        async with Client(
            transport=transport, fetch_schema_from_transport=False
        ) as session:
    
            query = gql(
                """
    subscription ShotTrackerSubscribeToGolfTournamentGroupScores($input: SubscribeToGolfTournamentGroupScoresInput!) {
      subscribeToGolfTournamentGroupScores(input: $input) {
        groupId
        l1Course
        teamId
        players {
          id
          lastName
          firstName
        }
        roundScores {
          courseId
          roundNo
          toParToday {
            value
          }
          holesThrough {
            value
          }
          startHole
          holes {
            holePar
            holeStrokes
            holeOrder
            holeNumber
          }
          isPlayoff
        }
        toPar {
          value
        }
        tournamentPosition {
          format
          value
          displayValue
        }
        status
      }
    }
    """
            )
    
            variables = {
                "input": {
                    "teamId": 21,
                    "tournamentId": 488,
                    "roundNo": 4,
                },
            }
    
            async for result in session.subscribe(
                query,
                operation_name="ShotTrackerSubscribeToGolfTournamentGroupScores",
                variable_values=variables,
                subscription_name="subscribeToGolfTournamentGroupScores",
                event_id=488,
            ):
    
                print(result)
    
    
    asyncio.run(main())