Search code examples
pythonbroadcastrabbitmqstomp

Stomp Broadcast with Rabbitmq and Python


Im trying to move a system from using morbid to rabbitmq, but I cannot seem to get the same broadcast behaviour morbid supplied by default. By broadcast I mean that when a message is added to the queue, every consumer recieves it. With rabbit, when a message is added they are distributed round robin style to every listener.

Can anyone tell me how to achieve the same kind of message distribution?

The stomp library used below is http://code.google.com/p/stomppy/

Failing being able to do with with stomp, even a amqplib example would really help.

My code at present looks like this

The Consumer

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demoqueue', ack='auto')

while True:
    pass
conn.disconnect()

And the sender looks like this

import stomp

class MyListener(object):
    def on_error(self, headers, message):
        print 'recieved an error %s' % message

    def on_message(self, headers, message):
        print 'recieved a message %s' % message

conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'user', 'password')
conn.set_listener('', MyListener())
conn.start()
conn.connect(username="user", password="password")
headers = {}

conn.subscribe(destination='/topic/demotopic', ack='auto')

while True:
    pass
conn.disconnect()

Solution

  • I finally figured out how to do it by creating an exchange for each "recieving group", im not sure how well rabbit will do with thousands of exchanges, so you might want to figure test this heavily before trying it in production

    In the sending code:

    conn.send(str(i), exchange=exchange, destination='')
    

    The blank destination is required, all I care about is sending to that exchange

    To recieve

    import stomp
    import sys
    from amqplib import client_0_8 as amqp
    #read in the exchange name so I can set up multiple recievers for different exchanges to tset
    exchange = sys.argv[1]
    conn = amqp.Connection(host="localhost:5672", userid="username", password="password",
     virtual_host="/", insist=False)
    
    chan = conn.channel()
    
    chan.access_request('/', active=True, write=True, read=True)
    
    #declare my exchange
    chan.exchange_declare(exchange, 'topic')
    #not passing a queue name means I get a new unique one back
    qname,_,_ = chan.queue_declare()
    #bind the queue to the exchange
    chan.queue_bind(qname, exchange=exchange)
    
    class MyListener(object):
        def on_error(self, headers, message):
            print 'recieved an error %s' % message
    
        def on_message(self, headers, message):
            print 'recieved a message %s' % message
    
    conn = stomp.Connection([('0.0.0.0', 61613), ('127.0.0.1', 61613)], 'browser', 'browser')
    conn.set_listener('', MyListener())
    conn.start()
    conn.connect(username="username", password="password")
    headers = {}
    
    #subscribe to the queue
    conn.subscribe(destination=qname, ack='auto')
    
    while True:
        pass
    conn.disconnect()