Search code examples
djangodjango-rest-frameworkdjango-channels

How to know if you are connecting two sockets to same group of channel-layer in django channels


I am actually trying to build a system wherein two entities(being doctor and patient) can share the same data over websockets using django.
The way I have setup my channels is sending the auth-token through query_string in the websocket protocol
The models are configured in the following fashion

enter image description here

the patient model also has an attribute called "grp_id" grp_id = models.UUIDField(default=uuid.uuid4, editable=False)


The consumers.py file is
• Working for patient

  1. make a connection request by sending auth token through query-string which will authenticate the user
  2. since the user is patient, the grp_id of the patient is fetched
  3. A channel is created using the grp_id value
  4. The patient triggers the start_sepsis function which would receive a set of sepsis-attribute, then serialize it and store it it in DB
  5. The same serialize data is broadcasted over the channel

• Working for doctor

  1. authentication like above
  2. the doctor fetches patients associated to them using _get_patient_of_doctor helper function
  3. doctor will try to connect all the patient's grp_id associated to it
  4. Once connected broadcast a message called "doc is connected"

class SepsisDynamicConsumer(AsyncJsonWebsocketConsumer):

    groups = ['test']

    @database_sync_to_async
    def _get_user_group(self, user):
        return user.user_type

    @database_sync_to_async
    def _get_user_grp_id(self, user):
        #************** THE grp function initiated **************
        print("THE USER obj", user)#output below
        print("email", user.email)#output below
        if "PATIENT" == user.user_type:
            return str(user.patient_set.all()[0].grp_id)
        elif "DOCTOR" == user.user_type:
            return map(str, user.doctor_set.all()[0].patient_set.all().values_list('grp_id', flat=True))
        else:
            print("THE USER IS SOMETHING ELSE")

    @database_sync_to_async
    def _get_patient_of_doctor(self, user):
        x = user.doctor_set.all()
        doc_obj = x[0]
        pat_ids = doc_obj.patient_set.all().values_list('grp_id', flat=True)
        return pat_ids

    @database_sync_to_async
    def _created_sepsis_data(self, data):
        """[summary]
            This helper function would generate and return the 
        """
        x = get_user_model().objects.get(id=data['patient'])
        x = x.patient_set.values('id')[0]['id']
        data.update({'patient': x})
        serializer = SepsisPatientSerializer(data=data)
        serializer.is_valid(raise_exception=True)
        x = serializer.create(serializer.validated_data)
        return SepsisPatientSerializer(x).data

    async def connect(self):
        user = self.scope['user']
        print("THE USER IS ------>", user)#output below
        if user.is_anonymous:
            print("user was unknown")
            await self.close()
        else:
            if user.user_type == 'PATIENT':
                pat_grp_id = await self._get_user_grp_id(user)
                await self.channel_layer.group_add(
                    group=pat_grp_id,
                    channel=self.channel_name
                )
                print("CONNECT TO ---------> ", pat_grp_id)
            elif user.user_type == 'DOCTOR':
                for doc_pat_grp_id in await self._get_user_grp_id(user):
                    print("Doc connected --------->", doc_pat_grp_id)
                    await self.channel_layer.group_add(
                        group=doc_pat_grp_id,
                        channel=self.channel_name
                    )
                print("Doc connected ", doc_pat_grp_id)
            await self.accept()

    async def start_sepsis(self, message):
        data = message.get('data')
        sepsis_generated_and_saved_data = await self._created_sepsis_data(data)
        await self.send_json({
            'type': 'echo.message',
            'data': sepsis_generated_and_saved_data
        })

    async def disconnect(self, code):
        user = self.scope['user']
        if user.is_anonymous:
            await self.close()
        else:
            if user.user_type == 'PATIENT':
                pat_grp_id = await self._get_user_grp_id(user)
                await self.channel_layer.group_discard(
                    group=pat_grp_id,
                    channel=self.channel_name
                )
                await super().disconnect(code)

    async def echo_message(self, message):
        await self.send_json(message)

    async def receive_json(self, content, **kwargs):
        message_type = content.get('type')
        if message_type == 'start.sepsis':
            await self.start_sepsis(content)
        if message_type == 'echo.message':
            await self.send_json({
                'type': message_type,
                'data': content.get('data'),
            })

The problem is: When I run a test which goes in a following order

  1. create a patient
  2. send a websocket connection request to server
  3. in patient's DB, I have added a "grp_id" attribute to have a unique ASCII unicode for django-channel-layer group which is where the patient would be connected
  4. connect a doctor
  5. send a websocket connection request to the server
  6. send a "echo.message" through websocket from patient with data:"Patient got connected"
  7. send a "echo.message" through websocket from doctor with data:"doctor got connected"

The thing is whenever the test runs the patient receives only those data which the patient has sent and doctor receives the same data that a doctor sent even though I using a django-channel broadcast function which would send data to all the connected users.
Also address something like (what if there are multiple patients associated to doctors and the doctor is getting the last patient in the query_set because of the "for loop") well right now only one doctor is associated to every patient.

My test

async def test_patient_doctor_on_same_channel(self, settings):
        settings.CHANNEL_LAYERS = TEST_CHANNEL_LAYERS
        # create doctor
        doctor_user, doctor_access = await create_user(
            'test_Doctor.user@example.com', 'pAssw0rd', 'DOCTOR', 'testDoctor_username'
        )
        # create patient
        user, access = await create_user(
            'test.user@example.com', 'pAssw0rd', 'PATIENT', 'test_username'
        )
        # connect patient
        communicator = WebsocketCommunicator(
            application=application,
            path=f'/sepsisDynamic/?token={access}'
        )
        connected, _ = await communicator.connect()

        # connect doctor
        doctor_communicator = WebsocketCommunicator(
            application=application,
            path=f'/sepsisDynamic/?token={doctor_access}'
        )
        # the doctor connects to the online patient
        doctor_connected, _ = await doctor_communicator.connect()

        # Simply echo an message
        message = {
            'type': 'echo.message',
            'data': 'This is a test message.'
        }

        await communicator.send_json_to(message)

        # checking if patient received the data on their end
        response = await communicator.receive_json_from()
        assert response == message

        # checking if doctor received the patient's data on their end
        response_doc = await doctor_communicator.receive_json_from()
        response_doc == message

        await communicator.disconnect()
        await doctor_communicator.disconnect()

My intuition says that it is not connected to the same websocket but I don't know why.
The output of the test

THE USER IS ------> test_username
************** THE grp function initiated **************
THE USER obj test_username
email test.user@example.com
CONNECT TO --------->  1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
**********************************************************
THE USER IS ------> testDoctor_username
THE DOC CONDITION RAN
************** THE grp function initiated **************
THE USER obj testDoctor_username
email test_Doctor.user@example.com
THE patient's unicode  <QuerySet [UUID('1b2a455c-28b0-4c4d-9d26-40f6a4634fa9')]>
Doc connected ---------> 1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
Doc connected  1b2a455c-28b0-4c4d-9d26-40f6a4634fa9
THE RECEIVE FUNCTION RAN
THE MESSAGE TYPE echo.message
******The error******
TestWebSocket::test_patient_doctor_on_same_channel - asyncio.exceptions.TimeoutError
response_doc = await doctor_communicator.receive_json_from()

Please help.

Thank you.


Solution

  • You're using send_json which sends to the same channel. To send to a group, you have to use the channel_layer.group_send method as documented in the docs chat example. You specify the handler in each channel in the group that will then push each message down to the client. One issue with your architecture is that the doctor's channel can be connected to many patient groups and there is no way of knowing which particular patient he is sending a message to. A way to solve that could be to forward the patient group numbers to the frontend so that it can specify the patient group name when sending the message. Then you can send the message like this

    async def receive_json(self, content, **kwargs):
            message_type = content.get('type')
            if message_type == 'start.sepsis':
                await self.start_sepsis(content)
            if message_type == 'echo.message':
                group_name = await self._get_user_grp_id(user) 
                if "DOCTOR" == user.user_type:
                    group_name = content['group_name']
                await self.channel_layer.group_send(
                    'type': 'chat.message',
                    'data': content.get('data'),
                })
    
    async def chat_message(self, event):
        """
        Called when someone has messaged our chat.
        """
        # Send a message down to the client
        await self.send_json(
            {
                "message": event["data"],
            }
        )