Search code examples
pythonrabbitmqrpckombu

Implementing RPC client with kombu for rabbitmq and it's direct reply-to feature


I'm implementing an rpc framework with kombu to be used with rabbit I want to use rabbitmq's direct reply-to feature but I can't find a way to implement it with kombu.

The client need to consume on 'amq.rabbitmq.reply-to' queue before producing it's message and producer and consumer should use the same channel. I also need to use producer pool (or some sort of connection pool) because clients are created in a threaded environment.

So far I have this code, rabbitmq won't complain about no producer with PRECONDITION error (If I remove the consumer part, it does complain), but the producer won't produce anything!

class KombuRpcClient(RpcClientBase):
    def __init__(self, params):
        self.future = Queue.Queue()
        self.logger = logger
        if isinstance(params, RpcConnectionProperties):
            self.rpc_connection_properties = params
        else:
            self.rpc_connection_properties = RpcConnectionProperties(
                host=params.get('host'),
                port=5672,
                username=params.get('username'),
                password=params.get('password'),
                vhost=params.get('vhost') if params.has_key('vhost') else '/'
            )
        self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
        self.reply_queue = KombuQueue('direct_reply', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')

    def call(self, exchange, key, msg, no_response=False, timeout=5):
        connection = Connection(self.amqp_url)
        if exchange is not None:
            key = exchange + ':' + key
        with producers_pool[connection].acquire(block=True) as producer:
            with producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, callbacks=[self._on_message],
                                           accept=['ujson']) as consumer:
                producer.publish(
                    msg,
                    exchange=default_exchange,
                    routing_key=key,
                    immediate=True,
                    serializer='ujson', reply_to=self.reply_queue.routing_key)
                consumer.consume()
                pass
        res = self.future.get(block=True, timeout=timeout)
        print res

    def cast(self, exchange, key, msg):
        pass

    def _on_message(self, body, message):
        print body
        self.future.put(body)

Solution

  • With a little help from from wireshark I realised that sometimes rabbitmq responds with that PRECONDITION error but kombu won't raise the exception, and I don't know why! Anyway, this code is working now:

    class KombuRpcClient(RpcClientBase):
        def __init__(self, params):
            self.future = Queue.Queue()
            self.logger = logger
            if isinstance(params, RpcConnectionProperties):
                self.rpc_connection_properties = params
            else:
                self.rpc_connection_properties = RpcConnectionProperties(
                    host=params.get('host'),
                    port=5672,
                    username=params.get('username'),
                    password=params.get('password'),
                    vhost=params.get('vhost') if params.has_key('vhost') else '/'
                )
            self.amqp_url = self.rpc_connection_properties.get_kombu_transport_url('pyamqp')
            self.reply_queue = KombuQueue('amq.rabbitmq.reply-to', exchange=default_exchange, routing_key='amq.rabbitmq.reply-to')
    
        def call(self, exchange, key, msg, no_response=False, timeout=5):
            connection = Connection(self.amqp_url)
            if exchange is not None:
                key = exchange + ':' + key
    
            with producers_pool[connection].acquire(block=True) as producer:
                consumer = producer.channel.Consumer(queues=[self.reply_queue], no_ack=True, auto_declare=True,
                                    callbacks=[self._on_message], accept=['ujson'])
                consumer.consume(no_ack=True)
                producer.publish(msg,
                                 serializer='ujson',
                                 exchange=default_exchange,
                                 routing_key=key,
                                 reply_to='amq.rabbitmq.reply-to')
                consumer.connection.drain_events()
                res = self.future.get(block=True, timeout=timeout)
                response = Response()
                response.body = res
                return res
    
        def cast(self, exchange, key, msg):
            connection = Connection(self.amqp_url)
            if exchange is not None:
                key = exchange + ':' + key
            with producers_pool[connection].acquire(block=True) as producer:
                producer.publish(msg,
                                 serializer='ujson',
                                 exchange=default_exchange,
                                 routing_key=key)
    
        def _on_message(self, body, message):
            print body
            self.future.put(body)
    

    BTW: sorry about variable names, this is supposed to replace an old stomp rpc client so I had to keep the names for compatibility