Search code examples
javascriptwebsocketpython-asynciodjango-channels

How to receive continuous stream of data


I have create a django-server(using django-channels) from which a continuous stream of data would be sent on the channel-layer where the client is connected on.

The below code represent the client, in which the "generate.sepsis" will trigger the function on the server-side to send json on the channel; I am simply receiving all the transmitted data from the server and printing it into the console.

async def receive_data_from_start_sepsis():
    ws_pat=websocket.WebSocket()
    ws_pat.connect('ws://localhost:8000/sepsisDynamic/?token=1fe10f828b00e170b3a9c5d41fc168a31facefc3')
    #time.sleep(7)
    await ws_pat.send(json.dumps({
    'type':'generate.sepsis',
    'data': {
                "heart_rate": 55,
                "oxy_saturation": 26.5,
                "temperature": 50,
                "blood_pressure": 95.48,
                "resp_rate": 156,
                "mean_art_pre": 85,
                "user_id": 15 
            }
    }))
    #time.sleep(2)
    while True:
        greeting = await ws_pat.recv()
        print(f"< {greeting}")
        asyncio.sleep(2)

# asyncio.run(receive_data_from_start_sepsis())
try:
    asyncio.get_event_loop().run_forever()
finally:
    asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())

but I get the following error

---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
     24 try:
---> 25     asyncio.get_event_loop().run_forever()
     26 finally:

~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
    524         if self.is_running():
--> 525             raise RuntimeError('This event loop is already running')
    526         if events._get_running_loop() is not None:

RuntimeError: This event loop is already running

During handling of the above exception, another exception occurred:

RuntimeError                              Traceback (most recent call last)
<ipython-input-6-9fdd3245dd6e> in <module>
     25     asyncio.get_event_loop().run_forever()
     26 finally:
---> 27     asyncio.get_event_loop().run_until_complete(receive_data_from_start_sepsis())

~\anaconda3\lib\asyncio\base_events.py in run_until_complete(self, future)
    568         future.add_done_callback(_run_until_complete_cb)
    569         try:
--> 570             self.run_forever()
    571         except:
    572             if new_task and future.done() and not future.cancelled():

~\anaconda3\lib\asyncio\base_events.py in run_forever(self)
    523         self._check_closed()
    524         if self.is_running():
--> 525             raise RuntimeError('This event loop is already running')
    526         if events._get_running_loop() is not None:
    527             raise RuntimeError(

RuntimeError: This event loop is already running

But when the async code on the server completes its (iteration of)sending data; the socket receives all the data like so. (which is the very first data item sent.)

{"type": "echo.message", "data": {"id": 147, "heart_rate": 155.0, "oxy_saturation": 150.0, "temperature": 43.0, "blood_pressure": 94.0, "resp_rate": 174.0, "mean_art_pre": 186.0, "patient": 10}}

The async function in django is:-

async def generating_patient_sepsis(self, message):
        # get the data from message
        data = message.get('data')
        print(f"THE INITIAL DATA {data}")
        # get the patient's id
        get_pat_id_in_data = await self._convert_user_id_to_patient_id(data)
        data = get_pat_id_in_data
        while True:
            time.sleep(5)
            await asyncio.sleep(1)
            # random sepsis data generated and `data` variable is mutated
            data.update({'heart_rate': random.randint(24, 200)})
            data.update({'oxy_saturation': random.randint(24, 200)})
            data.update({'temperature': random.randint(24, 200)})
            data.update({'blood_pressure': random.randint(24, 200)})
            data.update({'resp_rate': random.randint(24, 200)})
            data.update({'mean_art_pre': random.randint(24, 200)})
            print(f"THE DATA  --> {data}")
            # serializing and saving the data
            x = await self.serializer_checking_saving_data(data)
            # send the data to the channel
            await self.channel_layer.group_send(
                group=self.pat_grp_id,
                message={
                    'type': 'echo.message',
                    'data': x
                }
            )

I also want to learn how I would receive the same data in javascript so that I can represent the changes in a dynamic graphs fashion;

enter image description here


Solution

  • the reason why the test failed at

    greeting = await ws_pat.recv()
    

    is because the websocket receive function is a synchronous function and I was trying to await it. If I choose to remove the await keyword it would receive the stream of data but it only hold the the very first value in that stream of json-data.

    the way I was able to receive those json-data is by defining a function which is asynchronous and receiving the websocket data their; so whenever the websocket data will be broadcasted from the server.

    async def receive_sepsis(ws_pat):
        return ws_pat.recv()
    

    The issue with this implementation is the event-loop for broadcasting never gets completed because the while-loop is True; thereby all the json-data that is broadcasted from the server to the group(ie self.pat_grp_id) gets clogged up at the

    await self.channel_layer.group_send(<grp-name>,<message>)
    

    Can anyone please help me, how should I actually work on this;
    all I want to is connect patient and doctor to the same group(ie a grp_id attribute inside the patient model, which is a UUID field); once connected the patient will request "start_diagnosis" on the websocket; which will generates pseudo-random data of the illness; gets saved in the DB, returns the serialized version of the same data and then broadcasts it to the group-name with grp_id of the patient.