Search code examples
pythonpublish-subscribemessage-bus

Publish-Subscribe and a Message bus Python


I'm trying to create a central logging system for some of my python modules. I want to be able to send messages from a number of modules with logs, then central logger takes them and does the processing.

enter image description here

For the simplicity, I want my module A to look something like this:

  bus = connect_to_a_bus_that_is_always_there
  while True:
    #Publish a message to message bus, pseudo code
    bus.publish(topic="logs.a", message="example")
    time sleep(1)

and the logger(the only subscriber)

def actOnNewMessage(msg):
  if msg.topic.subtopic == "a":
     doSomethingForA(msg.data)

bus = connect_to_a_bus_that_is_always_there
bus.subscribe("logs", handler=actOnNewMessage)

while True:
  #wait for messages

Right now Logger module act like a library, so it's not persistently run, so maybe I can introduce something in between Logger and Message Bus, that would be constantly watching for new messages.

I have looked at PyPubSub but it doesn't seem to introduce the persistent communication between different running python modules in documentation. If anyone tried this, it works for me if I can use this between different modules.

Another catch is that I might end up with modules not written in python, so I don't really want a direct communication between modules A,B and Logger. In the end my architecture might look like that: enter image description here

I hope the information above is not confusing.

tl;dr: Publish-Subscribe with persistent message bus in python and a subscriber that is constantly waiting for new messages. Any ready-to-use solution?

EDIT: I'm considering running a web socket server that knows about Logger module, and other modules A, B know the address of the websocket. Are there any drawbacks to this design?


Solution

  • I've come across nanomsg. Perfectly suits my needs, with MIT license and no additional servers running. In addition there are bindings for any language I would like to use.

    from nanomsg import Socket, PUB
    
    s = Socket(PUB)
    s.connect('tcp://localhost:8080')
    s.send('topicMessage')
    

    from nanomsg import Socket, SUB
    
    s = Socket(SUB)
    s.connect('tcp://localhost:8080')
    s.set_string_option(SUB, SUB_SUBSCRIBE, "topic")
    while True:
        print(s.recv())