I want to connect to the websocket. When I inspect the traffic between the client and the server I see that the first message is the handshake:
{"type": "connection_init",
"payload":
{
"accept-language": "en",
"ec-version": "5.1.88",
"referrer": URL,
}
}
Based on the format (the keys of the dict) I conclude that the websocket uses an Apollo websocket transport prototcol.
Next, I am following the websocket-example of gql
's documentation.
import asyncio
import logging
from gql import gql, Client
from gql.transport.websockets import WebsocketsTransport
logging.basicConfig(level=logging.INFO)
async def main():
transport = WebsocketsTransport(url=URL,
init_payload={'accept-language': 'en',
'ec-version': '5.1.88'})
async with Client(transport=transport,
fetch_schema_from_transport=False) as session:
# do something
asyncio.run(main())
After reading more about the protocol here I still don't understand how I can send messages to the server within my Python
-script
How do I send the the below message to the websocket?
{
"id": "1073897396",
"type": "start",
"operationName": operation,
"eventId": 488
}
The EuropeanTour website does not use the Apollo websocket protocol for its API so you cannot use the standard WebsocketTransport class.
What I found out:
connection_ack
message after the connection_init
, it returns a wsIdentity
messagerequest-full-subscription
message, requiring the client to send the full subscription queryfull-subscription
message containing the full requested query, including the operationName
and a new field subscriptionName
subscriptionName
and the variables, from a json format with the dictionary keys sorted. It took me a while to figure it out as no error is generated if the id is not correctHere is a Chrome screenshot showing the request-full-subscription
messages:
You can then create your own transport by inheriting the WebsocketTransport
class and making the necessary modifications.
Here is an example of working code:
import asyncio
import json
import logging
from typing import Any, AsyncGenerator, Dict, Optional, Tuple
from graphql import DocumentNode, ExecutionResult, print_ast
from gql import Client, gql
from gql.transport.exceptions import TransportProtocolError
from gql.transport.websockets import WebsocketsTransport
logging.basicConfig(level=logging.INFO)
class EuropeanTourWebsocketsTransport(WebsocketsTransport):
def _hash(self, e):
t = 5381
r = len(e)
while r:
r -= 1
t = t * 33 ^ ord(e[r])
return t & 0xFFFFFFFF
def _calc_id(self, subscription_name, variables):
obj = {
"subscriptionName": subscription_name,
"variables": variables,
}
obj_stringified = json.dumps(
obj,
separators=(",", ":"),
sort_keys=True,
)
hashed_value = self._hash(obj_stringified)
return hashed_value
async def _send_query(
self,
document: DocumentNode,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: Optional[str] = None,
) -> int:
# Calculate the id by hashing the subscription name and the variables
query_id = self._calc_id(self.latest_subscription_name, variable_values)
# Creating the payload for the full subscription
payload: Dict[str, Any] = {"query": print_ast(document)}
if variable_values:
payload["variables"] = variable_values
if operation_name:
payload["operationName"] = operation_name
payload["subscriptionName"] = self.latest_subscription_name
# Saving the full query first and waiting for the server to request it later
self.saved_full_subscriptions[str(query_id)] = payload
# Then first start to request the subscription only with the operation name
query_str = json.dumps(
{
"id": str(query_id),
"type": "start",
"operationName": operation_name,
"eventId": self.latest_event_id,
}
)
await self._send(query_str)
return query_id
async def subscribe(
self,
document: DocumentNode,
*,
variable_values: Optional[Dict[str, Any]] = None,
operation_name: str,
subscription_name: str,
event_id: int,
send_stop: Optional[bool] = True,
) -> AsyncGenerator[ExecutionResult, None]:
self.latest_event_id = event_id
self.latest_subscription_name = subscription_name
async for result in super().subscribe(
document,
variable_values=variable_values,
operation_name=operation_name,
send_stop=send_stop,
):
yield result
async def _wait_ack(self) -> None:
self.saved_full_subscriptions = {}
while True:
init_answer = await self._receive()
answer_type, answer_id, execution_result = self._parse_answer(init_answer)
if answer_type == "wsIdentity":
return
raise TransportProtocolError(
"Websocket server did not return a wsIdentity response"
)
def _parse_answer(
self, answer: str
) -> Tuple[str, Optional[int], Optional[ExecutionResult]]:
try:
json_answer = json.loads(answer)
except ValueError:
raise TransportProtocolError(
f"Server did not return a GraphQL result: {answer}"
)
if "wsIdentity" in json_answer:
return ("wsIdentity", json_answer["wsIdentity"], None)
elif (
"type" in json_answer and json_answer["type"] == "request-full-subscription"
):
return ("request-full-subscription", json_answer["id"], None)
else:
return self._parse_answer_apollo(json_answer)
async def send_full_subscription(self, answer_id: str):
if answer_id not in self.saved_full_subscriptions:
raise Exception(f"Full subscription not found for id {answer_id}")
payload = self.saved_full_subscriptions[answer_id]
query_str = json.dumps(
{"id": answer_id, "type": "full-subscription", "payload": payload}
)
await self._send(query_str)
async def _handle_answer(
self,
answer_type: str,
answer_id: Optional[int],
execution_result: Optional[ExecutionResult],
) -> None:
if answer_type == "request-full-subscription":
await self.send_full_subscription(answer_id)
else:
await super()._handle_answer(answer_type, answer_id, execution_result)
async def main():
transport = EuropeanTourWebsocketsTransport(
url="wss://btec-websocket.services.imgarena.com",
init_payload={
"accept-language": "en",
"ec-version": "5.1.88",
"operator": "europeantour",
"referrer": "https://www.europeantour.com/",
"sport": "GOLF",
},
)
async with Client(
transport=transport, fetch_schema_from_transport=False
) as session:
query = gql(
"""
subscription ShotTrackerSubscribeToGolfTournamentGroupScores($input: SubscribeToGolfTournamentGroupScoresInput!) {
subscribeToGolfTournamentGroupScores(input: $input) {
groupId
l1Course
teamId
players {
id
lastName
firstName
}
roundScores {
courseId
roundNo
toParToday {
value
}
holesThrough {
value
}
startHole
holes {
holePar
holeStrokes
holeOrder
holeNumber
}
isPlayoff
}
toPar {
value
}
tournamentPosition {
format
value
displayValue
}
status
}
}
"""
)
variables = {
"input": {
"teamId": 21,
"tournamentId": 488,
"roundNo": 4,
},
}
async for result in session.subscribe(
query,
operation_name="ShotTrackerSubscribeToGolfTournamentGroupScores",
variable_values=variables,
subscription_name="subscribeToGolfTournamentGroupScores",
event_id=488,
):
print(result)
asyncio.run(main())