one script(datamanger.py)
from multiprocessing import Manager
q = Manager().Queue()
The other two scripts are like this
from datamanager import q
import time
while True:
time.sleep(1)
q.put(1)
from datamanager import q
while True:
if not q.empty():
data = q.get()
print(data)
Is it possible to realize the function only use queue instead of message queue such as kafka?
in order to have the queue alive and not tied to either process, you need to spawn a server that manages it, this server should have a singleton queue, and everyone that contacts it will get a proxy to this queue, the server code looks as follows:
# queue_server.py
from multiprocessing.managers import SyncManager
from multiprocessing.managers import BaseProxy
import multiprocessing
address = ('127.0.0.1', 50000) # you can change this
authkey = b"abc" # you should change this
class SingletonQueue:
instance = None
def __new__(cls, *args, **kwargs):
if SingletonQueue.instance is None:
SingletonQueue.instance = object.__new__(SingletonQueue)
return SingletonQueue.instance
else:
return SingletonQueue.instance
def get_queue(self):
if not hasattr(self, "queue"):
manager = SyncManager(address=address, authkey=authkey)
manager.connect()
self.queue = manager.Queue()
return self.queue
class CustomQueueProxy(BaseProxy):
_exposed_ = ['get_queue']
def get_queue(self):
queue = self._callmethod('get_queue')
return queue
def connect_manager():
multiprocessing.current_process().authkey = authkey
manager = SyncManager(address=address, authkey=authkey)
manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
manager.connect()
return manager
def start_server():
manager = SyncManager(address=address, authkey=authkey)
manager.register("SingletonQueue", SingletonQueue, CustomQueueProxy)
server = manager.get_server()
print(f"running on ip = {server.address[0]}, and port {server.address[1]}")
multiprocessing.current_process().authkey = authkey
server.serve_forever()
if __name__ == "__main__":
start_server()
you need to run the server, after running the server you can connect to it with a client, the client code will look like this:
import multiprocessing
import queue_server # the server python file
manager = queue_server.connect_manager()
queue: multiprocessing.Queue = manager.SingletonQueue().get_queue()
queue.put(1)
print(queue.get())
note that this sets the authentication key of your python process to a certain value, so you cannot use it for doing multiple connections with different authentication keys, you have to have a fixed authentication key.
Edit: i'd probably go with Charchit Agarwal answer if anyone is reading this in the future, or a mix of both answers. depending on whether you want to allow connection over network/docker boundaries, which my answer allows.