Search code examples
pythonparallel-processingmultiprocessingpython-multiprocessing

Python run multiple programs in parallel


Say I have thousands of sensor readings that get added to a queue like Redis or Apache Kafka, and foreach sensor_id to get its own worker that runs calculations on the historic readings. Each sensor_id would be published to a new topic then start streaming the readings.

Each worker would start off by reading from a database to instantiate some of its variable thresholds. The code is identical for EACH worker aside from its sensor_id and variable thresholds retrieved using the sensor_id as the lookup key. Once a sensor_id is added to the queue, it is assigned a worker that runs forever.

Would it be more efficient to do:

  1. multiprocessing.Pool to launch thousands of workers that run indefinitely?
  2. Or "python script.py param1 param2 &" thousands of times and just changing the params to instantiate? (I would generate a bash script programmatically to include all this logic fyi)

I want all the workers to run in parallel as fast as possible, these are CPU bound tasks, not I/O bound. Which is the best way to launch all the workers?

Clarification: Sensor readings will be generated every second per sensor_id so for 3 thousand sensors, 3 thousand events are generated a second, and 3 thousand topics inside the queue. Payload example is JSON {sensor_id: "hash", temperature: 85, rpm: 1200, ...}. My ideal setup was each worker maintaining the last 200 readings or so in memory to run calculations. The alternative being a one central queue where cycled workers would have to first make a db connection to read the 200 readings for the sensor_id it popped off the queue, but this takes time.


Solution

  • My understanding is that Redis lists are more reliable than Publish/Subscribe, which is more suitable when a single message needs to be consumed by multiple consumers. Your application would also be greatly simplified if all the sensors wrote to the same list and then you could have a processing pool of identical workers reading from that list in a loop. The message would naturally identify which sensor was involved and when a worker read a message for a new sensor id that it has seen before, it would have to do a "first time" initialization for that sensor by reading from the database the pertinent information and saving it in a dictionary keyed by the sensor id. Ultimately, this dictionary could end up having 3,000 entries. This leads to the conclusion that the pool should be initialized once with a dictionary of all 3,000 entries that all workers can access before they even start reading messages.

    The following code can be used, however, if for whatever reason all 3,000 sensors must be writing to 3,000 different Redis lists (if you are using Redis lists to begin with). The idea would then be to find a way to somehow read "simultaneously" from 3,000 lists retrieving the messages as they become available and writing them to a single queue that the workers can be reading from and thus simplifying the worker logic. This code is based on 12.13. Polling Multiple Thread Queues in Python Cookbook 3rd Edition by David Beazley and Brian K. Jones available here. This has been adapted to poll 3,000 Redis lists and to send the items read to a single multiprocessing.Queue instance. This code also includes a processing pool of producer workers emulating the creation of sensor readings that are added to one of 3,000 lists, which you would not have in your actual code here. But the 3,000 PollableList instances would have to be accessible to whatever the code is that is responsible for getting the sensor readings. We also have here only a single Process reading from the queue of messages, which are objects, and printing them, so as to keep the printing "orderly." In reality, you would have a pool of processes, as I have mentioned.

    Unfortunately, the technique described in the Cookbook seems to have a limitation with regards to how large the list passed to select can be and so being forced to limit the list size to 500 to be on the safe side, that meant I had to break up the 3,000 Redis lists into 6 groups of 500 lists.

    import redis
    import socket
    import os
    import json
    import select
    from multiprocessing import Process, Queue
    from multiprocessing.pool import ThreadPool
    from functools import partial
    
    class PollableList():
        r = redis.Redis()
    
        def __init__(self, idx):
            self.idx = idx
            self.list_name = f'sensor_{idx}'
            # Create a pair of connected sockets
            if os.name == 'posix':
                self._putsocket, self._getsocket = socket.socketpair()
            else:
                # Compatibility on non-POSIX systems
                server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                server.bind(('127.0.0.1', 0))
                server.listen(1)
                self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                self._putsocket.connect(server.getsockname())
                self._getsocket, _ = server.accept()
                server.close()
    
        def fileno(self):
            return self._getsocket.fileno()
    
        def put(self, item):
            PollableList.r.rpush(self.list_name, json.dumps(item))
            self._putsocket.send(b'x')
    
        def get(self):
            self._getsocket.recv(1)
            return json.loads(PollableList.r.lpop(self.list_name).decode())
    
    def producer(lists):
        """ emulate the 3000 sensors """
        # Feed data to the lists (we won't run indefinitely)
        for _ in range(3):
            for lst in lists:
                lst.put({'id': lst.idx, 'value': lst.idx * 100 + 1})
    
    def consumer(q, list_names):
        '''
        Consumer that reads data on multiple lists simultaneously
        '''
        while True:
            can_read, _, _ = select.select(list_names, [], [])
            for r in can_read:
                item = r.get()
                q.put(item)
    
    # in actual use case, there would be a pool of workers:
    def worker(q):
        message_number = 0
        while True:
            item = q.get()
            message_number += 1
            print(message_number, item)
    
    
    def main():
        lists = [PollableList(i) for i in range(0, 3000)]
        # select cannot handle all 3000 sockets at once:
        lists1 = lists[0:500]
        lists2 = lists[500:1000]
        lists3 = lists[1000:1500]
        lists4 = lists[1500:2000]
        lists5 = lists[2000:2500]
        lists6 = lists[2500:3000]
    
        p0 = Process(target=producer, args=(lists,))
        p0.daemon = True
        p0.start()
    
        q = Queue()
        thread_pool = ThreadPool(6)
        thread_pool.map_async(partial(consumer, q), [lists1, lists2, lists3, lists4, lists5, lists6])
    
        # This would in reality be a process pool of workers reading from q:
        p1 = Process(target=worker, args=(q,))
        p1.daemon = True
        p1.start()
    
        # wait for all 9000 messages to be displayed by worker:
        input('Hit enter to terminate...\n')
    
    # required for Windows:
    if __name__ == '__main__':
        main()