Search code examples
pythonpython-3.xsocketsmultiprocessingfork

How to use incoming data stream at a socket for multiple parallel processes in Python?


import socket 

ip_addr = '100.100.1.1'

port_num = 5000

socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
socket_obj.bind((ip_addr, port_num))
socket_obj.settimeout(2)

I have created this socket object, where I have an incoming data stream at a certain rate. I want to to use that data in real-time for two processes running in parallel.

def process1():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_process(new_data)
        
        except socket.timeout:
            break

def process2():
    while True:
        Try:
            new_data = socket_obj.recvfrom()
            some_other_process(new_data)
        
        except socket.timeout:
            break

Running any one of the two processes works flawlessly, but how do I make sure that I can have two processes running in parallel, that are reading from the same socket without any significant lag or loss of data in either of the two streams?

The nature of incoming data is very deterministic. Exactly 50 bytes of data comes in at a rate of 1000 times per second. I have set a timeout of 2 seconds so that the processes end once the socket does not receive any data for 2 seconds.

Also, each process needs access to each and every data packet that arrives at the socket.


Solution

  • I would solve the problem by creating two Process instances where each instance is passed their own multiprocessing.Queue instance to which the main process, which is reading from the socket, puts the read message on each of the process's queue for processing. There is a bit of overhead in writing and reading to these queues, which could slow down the maximum processing rate a bit, so it becomes a question of whether now if the processing can keep up with the incoming data. But you clearly cannot have each process reading from the socket in parallel. See the following emulation that I did on my desktop, which describes the issue.

    import socket
    import multiprocessing
    
    def some_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def some_other_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def main():
        ip_addr = '100.100.1.1'
        port_num = 5000
    
        socket_obj = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
        socket_obj.bind((ip_addr, port_num))
        socket_obj.settimeout(2)
    
        q1 = multiprocessing.Queue()
        p1 = multiprocessing.Process(target=some_process, args=(q1,))
        q2 = multiprocessing.Queue()
        p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
        p1.start()
        p2.start()
        while True:
            try:
                new_data = socket_obj.recvfrom()
            except socket.timeout:
                break
            else:
                q1.put(new_data)
                q2.put(new_data)
        # wait for outstanding tasks to complete:
        q1.put(None)
        q2.put(None)
        p1.join()
        p2.join()
    
    # Required if running under Windows:
    if __name__ == '__main__':
        main()
    

    Emulation On My Desktop

    I ran the following emulation on my not-so-fast desktop to see what rate I could sustain with trivial processing functions due to the overhead of writing and reading those 50-byte data items to a multiprocessing queue:

    import multiprocessing
    
    def some_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def some_other_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def main():
        import time
        q1 = multiprocessing.Queue()
        p1 = multiprocessing.Process(target=some_process, args=(q1,))
        q2 = multiprocessing.Queue()
        p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
        p1.start()
        p2.start()
        t1 = time.time()
        for new_data in range(10_000):
            # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
            expiration = time.time() + .001
            q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
            q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
            diff = expiration - time.time()
            if diff > 0:
                time.sleep(diff)
        # wait for outstanding tasks to complete:
        q1.put(None)
        q2.put(None)
        rate = 10_000 / (time.time() - t1)
        print('Done:', rate)
        p1.join()
        p2.join()
    
    # Required if running under Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    Done: 614.8320395921962
    

    I could only sustain a rate of 615 messages/second. If you are writing to the queues faster than the messages can be processed, memory will be exhausted. This is not a good thing.

    Update

    The above emulation seemed somewhat suspect to me. I determined in the following benchmark that I could write to the queue at an extremely high rate (208,317 messages/sec.) and that reading and processing the messages (again with a trivial processing) could be done at a high rate (23,094 messages/sec.). I must conclude that my previous emulation was inaccurate due to the time.sleep function being rather imprecise.

    import multiprocessing
    
    def some_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def some_other_process(q):
        while True:
            data = q.get()
            if data is None:
                break
            # process data:
            ...
    
    def main():
        import time
    
        q1 = multiprocessing.Queue()
        p1 = multiprocessing.Process(target=some_process, args=(q1,))
        q2 = multiprocessing.Queue()
        p2 = multiprocessing.Process(target=some_other_process, args=(q2,))
        p1.start()
        p2.start()
        t1 = time.time()
        for _ in range(10_000):
            # Next put will be in .001 seconds for a hoped-for rate of 1000/sec.
            q1.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
            q2.put('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx')
        # wait for outstanding tasks to complete:
        q1.put(None)
        q2.put(None)
        rate = 10_000 / (time.time() - t1)
        print('Done. Put Rate:', rate)
        p1.join()
        p2.join()
        rate = 10_000 / (time.time() - t1)
        print('Done. Processing Rate:', rate)
    
    
    # Required if running under Windows:
    if __name__ == '__main__':
        main()
    

    Prints:

    Done. Put Rate: 208317.3903110131
    Done. Processing Rate: 23094.772557205524