Search code examples
pythonwebsocketprocessmultiprocessingpython-asyncio

Different Processes between websocket datastream and computational code


I have a datastream from multiple websockets and convert the necessary data to two lists(this whole part uses asyncio). I end up creating, updating and returning the lists like this:

class A:
    def __init__(self):
        self.list_x = [0,0,0,0,0]
        self.list_y = [0,0,0,0,0]
    
    async def updateList(self, var1, var2, i):
        self.list_x[i] = var1
        self.list_y[i] = var2

    async def returnLists():
        return self.list_x, self.list_y

t = A()

#this is the callback function for the responses i get.
async def dataCallback(data, receipt_timestamp):
    if data.name == 'B':
        await t.updateList(data.info[0], data.info[1], 0)
    elif data.name == 'C':
        await t.updateList(data.info[0], data.info[1], 1)
    elif data.name == 'D':
        await t.updateList(data.info[0], data.info[1], 2)
    elif data.name == 'E':
        await t.updateList(data.info[0], data.info[1], 3)
    elif data.name == 'F':
        await t.updateList(data.info[0], data.info[1], 4)   

I would like to then use the data in these lists(which is updated roughly every 25ms) in a seperate Process to do calculations. I have done research but im finding different answers so im unsure if it is even possible to feed the constantly updated lists to the new Process or if it's even a good idea to do.

I would love to hear some input on this matter. If it's possible to do, could you explain how or give a link to what is needed. If its not, whats the best way around it?


Solution

  • I would consider something along these lines. Move the functionality of class A into a separate process. Add the calculation to class A.

    Pass the raw data to the subprocess using a multiprocessing.Queue. The second process loops to collect new data and hand if to class A for analysis.

    When the main process gets some new data, it simply places it on the queue and waits for the next batch.

    Approximately like this:

    # A child Process
    def process2(q: multiprocessing.Queue):
        a = A()
        while True:
            var1, var2, i = q.get()
            a.do_calculation(var1, var2, i)
    
    class A:
        def __init__(self):
            self.list_x = [0,0,0,0,0]
            self.list_y = [0,0,0,0,0]
        
        def do_calculation(self, var1, var2, i):
            self.list_x[i] = var1
            self.list_y[i] = var2
            # Some number crunching here
            
    # In the main Process:
    def update_list(q: multiprocessing.Queue, var1, var2, i):
        # There is no reason for this to be an async function
        q.put((var1, var2, i))
    

    As for this comment: "#I just have A instead of self because i was fooling around between two classes which were run in separate instances.". Would it be too much trouble to clean up your code before posting it to this site? You are asking other people - perhaps hundreds of them - to read through it and comment on it. Please have some consideration for their time.