Search code examples
pythonmultiprocessingqueue

Is it possible for multiple python files to use one queue?


one script(datamanger.py)

from multiprocessing import Manager
q = Manager().Queue()

The other two scripts are like this

from datamanager import q
import time
while True:
    time.sleep(1)
    q.put(1)
from datamanager import q
while True:
    if not q.empty():
        data = q.get()
        print(data)

Is it possible to realize the function only use queue instead of message queue such as kafka?


Solution

  • in order to have the queue alive and not tied to either process, you need to spawn a server that manages it, this server should have a singleton queue, and everyone that contacts it will get a proxy to this queue, the server code looks as follows:

    # queue_server.py
    
    from multiprocessing.managers import SyncManager
    from multiprocessing.managers import BaseProxy
    import multiprocessing
    
    address = ('127.0.0.1', 50000)  # you can change this
    authkey = b"abc"  # you should change this
    
    class SingletonQueue:
        instance = None
        def __new__(cls, *args, **kwargs):
            if SingletonQueue.instance is None:
                SingletonQueue.instance = object.__new__(SingletonQueue)
                return SingletonQueue.instance
            else:
                return SingletonQueue.instance
    
        def get_queue(self):
            if not hasattr(self, "queue"):
                manager = SyncManager(address=address, authkey=authkey)
                manager.connect()
                self.queue = manager.Queue()
            return self.queue
    
    class CustomQueueProxy(BaseProxy):
        _exposed_ = ['get_queue']
        def get_queue(self):
            queue = self._callmethod('get_queue')
            return queue
    
    
    def connect_manager():
        multiprocessing.current_process().authkey = authkey
    
        manager = SyncManager(address=address, authkey=authkey)
        manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
        manager.connect()
        return manager
    
    def start_server():
        manager = SyncManager(address=address, authkey=authkey)
        manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
        server = manager.get_server()
        print(f"running on ip = {server.address[0]}, and port {server.address[1]}")
        multiprocessing.current_process().authkey = authkey
        server.serve_forever()
    
    if __name__ == "__main__":
        start_server()
    

    you need to run the server, after running the server you can connect to it with a client, the client code will look like this:

    import multiprocessing
    import queue_server  # the server python file
    
    manager = queue_server.connect_manager()
    queue: multiprocessing.Queue = manager.SingletonQueue().get_queue()
    queue.put(1)
    print(queue.get())
    

    note that this sets the authentication key of your python process to a certain value, so you cannot use it for doing multiple connections with different authentication keys, you have to have a fixed authentication key.

    Edit: i'd probably go with Charchit Agarwal answer if anyone is reading this in the future, or a mix of both answers. depending on whether you want to allow connection over network/docker boundaries, which my answer allows.