Search code examples
pythonmockingrabbitmqpython-unittestpika

How to mock a pika connection for a different module?


I have a class that imports the following module:

import pika
import pickle
from apscheduler.schedulers.background import BackgroundScheduler
import time
import logging
class RabbitMQ():
    def __init__(self):
        self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
        self.channel = self.connection.channel()
        self.sched = BackgroundScheduler()
        self.sched.add_job(self.keep_connection_alive, id='clean_old_data', trigger='cron', hour = '*', minute='*', second='*/50')
        self.sched.start()
    def publish_message(self, message , path="path"):
        message["path"] = path
        logging.info(message)
        message = pickle.dumps(message)
        self.channel.basic_publish(exchange="", routing_key="server", body=message)
    def keep_connection_alive(self):
        self.connection.process_data_events()
rabbitMQ = RabbitMQ()
def publish_message(message , path="path"):
    rabbitMQ.publish_message(message, path=path)

My class.py:

import RabbitMQ as rq
class MyClass():
...

When generating unit tests for MyClass I can't mock the connection for this part of the code. And keeping throwing exceptions. And it will not work at all

pika.exceptions.ConnectionClosed: Connection to 127.0.0.1:5672 failed: [Errno 111] Connection refused

I tried a couple of approaches to mock this connection but none of those seem to work. I was wondering what can I do to support this sort of test? Mock the entire RabbitMQ module? Or maybe mock only the connection


Solution

  • Like the commenter above mentions, the issue is your global creation of your RabbitMQ.

    My knee-jerk reaction is to say "just get rid of that, and your module-level publish_message". If you can do that, go for that solution. You have a publish_message on your RabbitMQ class that accepts the same args; any caller would then be expected to create an instance of your RabbitMQ class.

    If you don't want to or can't do that for whatever reason, you should just move the instantiation of move that object instantiation in your module-level publish_message like this:

    def publish_message(message , path="path"):
        rabbitMQ = RabbitMQ()
        rabbitMQ.publish_message(message, path=path)
    

    This will create a new connection every time you call it though. Maybe that's ok...but maybe it's not. So to avoid creating duplicate connections, you'd want to introduce something like a singleton pattern:

    class RabbitMQ():
        __instance = None
    
        ...
    
        @classmethod
        def get_instance(cls):
            if cls.__instance is None:
                cls.__instance = RabbitMQ()
            return cls.__instance
    
    def publish_message(message , path="path"):
        RabbitMQ.get_instance().publish_message(message, path=path)
    

    Ideally though, you'd want to avoid the singleton pattern entirely. Whatever caller should store a single instance of your RabbitMQ object and call publish_message on it directly.

    So the TLDR/ideal solution IMO: Just get rid of those last 3 lines. The caller should create a RabbitMQ object.

    EDIT: Oh, and the why it's happening -- When you import that module, this is being evaluated: rabbitMQ = RabbitMQ(). Your attempt to mock it is happening after that is evaluated, and fails to connect.